You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/12/10 17:28:49 UTC

[GitHub] [iceberg] danielcweeks opened a new pull request #3711: Add FileIO implementation for Google Cloud Storage

danielcweeks opened a new pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711


   This PR adds native FileIO support for GCS using the google storage apis directly.
   
   The read/write paths leverage the streaming transfers and allow for full configuration of the storage provider via custom storage supplier.  By default, a number of common configuration properties are provided via GCPProperties and can be used with a dynamically loaded configuration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767067387



##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileIO Implementation backed by Google Cloud Storage (GCS)
+ * <p>
+ * Locations follow the conventions used by
+ * {@link com.google.cloud.storage.BlobId#fromGsUtilUri(String) BlobId.fromGsUtilUri}

Review comment:
       I added a new link, but there doesn't appear to be (or at least I could find) a canonical reference for this format.  Updated docs should be more clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767340735



##########
File path: build.gradle
##########
@@ -354,6 +354,30 @@ project(':iceberg-aws') {
   }
 }
 
+project(':iceberg-gcp') {
+  dependencies {
+    implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
+    api project(':iceberg-api')
+    implementation project(':iceberg-common')
+    implementation project(':iceberg-core')
+
+    implementation platform('com.google.cloud:libraries-bom:24.1.0')

Review comment:
       use `versions.props` for version number. Also if we use that, I think we don't need to load the bom?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767068170



##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.api.client.util.Lists;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobWriteOption;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The GCSOutputStream leverages native streaming channels from the GCS API
+ * for streaming uploads. See <a href="https://cloud.google.com/storage/docs/streaming">Streaming Transfers</a>
+ */
+class GCSOutputStream extends PositionOutputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSOutputStream.class);
+
+  private final StackTraceElement[] createStack;
+  private final Storage storage;
+  private final BlobId blobId;
+  private final GCPProperties gcpProperties;
+
+  private OutputStream stream;
+
+  private long pos = 0;
+  private boolean closed = false;
+
+  GCSOutputStream(Storage storage, BlobId blobId, GCPProperties gcpProperties) throws IOException {
+    this.storage = storage;
+    this.blobId = blobId;
+    this.gcpProperties = gcpProperties;
+
+    createStack = Thread.currentThread().getStackTrace();
+
+    openStream();
+  }
+
+  @Override
+  public long getPos() {
+    return pos;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    stream.write(b);
+    pos += 1;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    stream.write(b, off, len);
+    pos += len;
+  }
+
+  private void openStream() {
+    List<BlobWriteOption> writeOptions = Lists.newArrayList();
+
+    gcpProperties.encryptionKey().ifPresent(
+        (key) -> writeOptions.add(BlobWriteOption.encryptionKey(key)));
+    gcpProperties.userProject().ifPresent(
+        (userProject) -> writeOptions.add(BlobWriteOption.userProject(userProject)));
+
+    WriteChannel channel = storage.writer(BlobInfo.newBuilder(blobId).build(),
+        writeOptions.toArray(new BlobWriteOption[0]));
+
+    gcpProperties.channelWriteChunkSize().ifPresent(channel::setChunkSize);

Review comment:
       I guess I should clarify that it valid to use a negative number to effectively say "use the min buffer size allowed".  While that may be bad practice, I think adding additional checks here is unnecessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767057016



##########
File path: build.gradle
##########
@@ -354,6 +354,30 @@ project(':iceberg-aws') {
   }
 }
 
+project(':iceberg-gcp') {
+  dependencies {
+    implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
+    api project(':iceberg-api')
+    implementation project(':iceberg-common')
+    implementation project(':iceberg-core')
+
+    implementation platform('com.google.cloud:libraries-bom:24.1.0')
+    implementation 'com.google.cloud:google-cloud-storage'
+
+    testImplementation 'com.google.cloud:google-cloud-nio'
+    testImplementation 'org.assertj:assertj-core'

Review comment:
       Is it possible to add this as a testImplementation to all sub-projects (could be a follow up)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#issuecomment-993755825


   @jackye1995 Thanks for pointing out the thread and I followed up there.  For now, we don't really need to add this to the runtime, so I think what we have here is fine.  We can always follow up with bundle changes if the thinking on that changes, but I wouldn't want to push that with this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#issuecomment-993054258


   @danielcweeks could you provide some insights into the current thread in dev list about the bundled runtime? @openinx is starting a vote. I approved this PR assuming we will not include this to the runtime, but if we go with his option 2 we will need to mark the Google dependencies as compile only.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767040843



##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.api.client.util.Lists;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobWriteOption;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The GCSOutputStream leverages native streaming channels from the GCS API
+ * for streaming uploads. See <a href="https://cloud.google.com/storage/docs/streaming">Streaming Transfers</a>
+ */
+class GCSOutputStream extends PositionOutputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSOutputStream.class);
+
+  private final StackTraceElement[] createStack;
+  private final Storage storage;
+  private final BlobId blobId;
+  private final GCPProperties gcpProperties;
+
+  private OutputStream stream;
+
+  private long pos = 0;
+  private boolean closed = false;
+
+  GCSOutputStream(Storage storage, BlobId blobId, GCPProperties gcpProperties) throws IOException {
+    this.storage = storage;
+    this.blobId = blobId;
+    this.gcpProperties = gcpProperties;
+
+    createStack = Thread.currentThread().getStackTrace();
+
+    openStream();
+  }
+
+  @Override
+  public long getPos() {
+    return pos;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    stream.write(b);
+    pos += 1;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    stream.write(b, off, len);
+    pos += len;
+  }
+
+  private void openStream() {
+    List<BlobWriteOption> writeOptions = Lists.newArrayList();
+
+    gcpProperties.encryptionKey().ifPresent(
+        (key) -> writeOptions.add(BlobWriteOption.encryptionKey(key)));

Review comment:
       nit: redundant brackets around `(key)` and `(userProject)`

##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.api.client.util.Lists;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobWriteOption;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The GCSOutputStream leverages native streaming channels from the GCS API
+ * for streaming uploads. See <a href="https://cloud.google.com/storage/docs/streaming">Streaming Transfers</a>
+ */
+class GCSOutputStream extends PositionOutputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSOutputStream.class);
+
+  private final StackTraceElement[] createStack;
+  private final Storage storage;
+  private final BlobId blobId;
+  private final GCPProperties gcpProperties;
+
+  private OutputStream stream;
+
+  private long pos = 0;
+  private boolean closed = false;
+
+  GCSOutputStream(Storage storage, BlobId blobId, GCPProperties gcpProperties) throws IOException {
+    this.storage = storage;
+    this.blobId = blobId;
+    this.gcpProperties = gcpProperties;
+
+    createStack = Thread.currentThread().getStackTrace();
+
+    openStream();
+  }
+
+  @Override
+  public long getPos() {
+    return pos;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    stream.write(b);
+    pos += 1;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    stream.write(b, off, len);
+    pos += len;
+  }
+
+  private void openStream() {
+    List<BlobWriteOption> writeOptions = Lists.newArrayList();
+
+    gcpProperties.encryptionKey().ifPresent(
+        (key) -> writeOptions.add(BlobWriteOption.encryptionKey(key)));
+    gcpProperties.userProject().ifPresent(
+        (userProject) -> writeOptions.add(BlobWriteOption.userProject(userProject)));
+
+    WriteChannel channel = storage.writer(BlobInfo.newBuilder(blobId).build(),
+        writeOptions.toArray(new BlobWriteOption[0]));
+
+    gcpProperties.channelWriteChunkSize().ifPresent(channel::setChunkSize);

Review comment:
       can we add some basic checks for configuration such as check size > 0 if present.

##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileIO Implementation backed by Google Cloud Storage (GCS)
+ * <p>
+ * Locations follow the conventions used by
+ * {@link com.google.cloud.storage.BlobId#fromGsUtilUri(String) BlobId.fromGsUtilUri}

Review comment:
       could you add an example or GCS link of the accepted location format? https://googleapis.dev/java/google-cloud-storage/latest/com/google/cloud/storage/BlobId.html#fromGsUtilUri-java.lang.String- does not really provide any information.

##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileIO Implementation backed by Google Cloud Storage (GCS)
+ * <p>
+ * Locations follow the conventions used by
+ * {@link com.google.cloud.storage.BlobId#fromGsUtilUri(String) BlobId.fromGsUtilUri}
+ */
+public class GCSFileIO implements FileIO {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class);
+
+  private SerializableSupplier<Storage> storageSupplier;
+  private GCPProperties gcpProperties;
+  private transient Storage storage;
+  private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
+
+  /**
+   * No-arg constructor to load the FileIO dynamically.
+   * <p>
+   * All fields are initialized by calling {@link GCSFileIO#initialize(Map)} later.
+   */
+  public GCSFileIO() {
+  }
+
+  /**
+   * Constructor with custom storage supplier and GCP properties.
+   * <p>
+   * Calling {@link GCSFileIO#initialize(Map)} will overwrite information set in this constructor.
+   *
+   * @param storageSupplier storage supplier
+   * @param gcpProperties gcp properties
+   */
+  public GCSFileIO(SerializableSupplier<Storage> storageSupplier, GCPProperties gcpProperties) {
+    this.storageSupplier = storageSupplier;
+    this.gcpProperties = gcpProperties;
+  }
+
+  @Override
+  public InputFile newInputFile(String path) {
+    return new GCSInputFile(client(), BlobId.fromGsUtilUri(path), gcpProperties);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+    return new GCSOutputFile(client(), BlobId.fromGsUtilUri(path), gcpProperties);
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    // There is no specific contract about whether delete should fail
+    // and other FileIO providers ignore failure.  Log the failure for
+    // now as it is not a required operation for Iceberg.
+    if (!client().delete(BlobId.fromGsUtilUri(path))) {

Review comment:
       under what condition would it return false instead of throwing an exception?

##########
File path: gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Random;
+import java.util.stream.StreamSupport;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.junit.Before;
+import org.junit.Test;
+
+import static java.lang.String.format;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertFalse;
+
+public class GCSFileIOTest {
+  private static final String TEST_BUCKET = "TEST_BUCKET";
+  private final Random random = new Random(1);
+
+  Storage storage = LocalStorageHelper.getOptions().getService();

Review comment:
       variables can be private? also for other similar places in the tests

##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class GCSOutputFile extends BaseGCSFile implements OutputFile {
+  public static GCSOutputFile fromLocation(String location, Storage storage, GCPProperties gcpProperties) {

Review comment:
       I don't see this (and the same one in input file) used anywhere, what's its purpose?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r768061864



##########
File path: build.gradle
##########
@@ -354,6 +354,30 @@ project(':iceberg-aws') {
   }
 }
 
+project(':iceberg-gcp') {
+  dependencies {
+    implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
+    api project(':iceberg-api')
+    implementation project(':iceberg-common')
+    implementation project(':iceberg-core')
+
+    implementation platform('com.google.cloud:libraries-bom:24.1.0')
+    implementation 'com.google.cloud:google-cloud-storage'
+
+    testImplementation 'com.google.cloud:google-cloud-nio'
+    testImplementation 'org.assertj:assertj-core'

Review comment:
       Let's keep that as a follow up since it's a small change and easier to review separately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks merged pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
danielcweeks merged pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767339675



##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileIO Implementation backed by Google Cloud Storage (GCS)
+ * <p>
+ * Locations follow the conventions used by
+ * {@link com.google.cloud.storage.BlobId#fromGsUtilUri(String) BlobId.fromGsUtilUri}
+ */
+public class GCSFileIO implements FileIO {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class);
+
+  private SerializableSupplier<Storage> storageSupplier;
+  private GCPProperties gcpProperties;
+  private transient Storage storage;
+  private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
+
+  /**
+   * No-arg constructor to load the FileIO dynamically.
+   * <p>
+   * All fields are initialized by calling {@link GCSFileIO#initialize(Map)} later.
+   */
+  public GCSFileIO() {
+  }
+
+  /**
+   * Constructor with custom storage supplier and GCP properties.
+   * <p>
+   * Calling {@link GCSFileIO#initialize(Map)} will overwrite information set in this constructor.
+   *
+   * @param storageSupplier storage supplier
+   * @param gcpProperties gcp properties
+   */
+  public GCSFileIO(SerializableSupplier<Storage> storageSupplier, GCPProperties gcpProperties) {
+    this.storageSupplier = storageSupplier;
+    this.gcpProperties = gcpProperties;
+  }
+
+  @Override
+  public InputFile newInputFile(String path) {
+    return new GCSInputFile(client(), BlobId.fromGsUtilUri(path), gcpProperties);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+    return new GCSOutputFile(client(), BlobId.fromGsUtilUri(path), gcpProperties);
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    // There is no specific contract about whether delete should fail
+    // and other FileIO providers ignore failure.  Log the failure for
+    // now as it is not a required operation for Iceberg.
+    if (!client().delete(BlobId.fromGsUtilUri(path))) {

Review comment:
       I see. I was curious in what condition would it return false instead of throwing. Based on the documentation, it says 
   
   ```
   boolean delete(BlobId blob)
   
   Deletes the requested blob.
   
   Example of deleting a blob.
   
   
    String bucketName = "my-unique-bucket";
    String blobName = "my-blob-name";
    BlobId blobId = BlobId.of(bucketName, blobName);
    boolean deleted = storage.delete(blobId);
    if (deleted) {
      // the blob was deleted
    } else {
      // the blob was not found
    }
    
   
   Returns:
       true if blob was deleted, false if it was not found
   ```
   
   So I think we can be a bit more explicit in the logging, something like "Fail to delete file, not found: %s"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767061495



##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.api.client.util.Lists;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobWriteOption;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The GCSOutputStream leverages native streaming channels from the GCS API
+ * for streaming uploads. See <a href="https://cloud.google.com/storage/docs/streaming">Streaming Transfers</a>
+ */
+class GCSOutputStream extends PositionOutputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSOutputStream.class);
+
+  private final StackTraceElement[] createStack;
+  private final Storage storage;
+  private final BlobId blobId;
+  private final GCPProperties gcpProperties;
+
+  private OutputStream stream;
+
+  private long pos = 0;
+  private boolean closed = false;
+
+  GCSOutputStream(Storage storage, BlobId blobId, GCPProperties gcpProperties) throws IOException {
+    this.storage = storage;
+    this.blobId = blobId;
+    this.gcpProperties = gcpProperties;
+
+    createStack = Thread.currentThread().getStackTrace();
+
+    openStream();
+  }
+
+  @Override
+  public long getPos() {
+    return pos;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    stream.write(b);
+    pos += 1;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    stream.write(b, off, len);
+    pos += len;
+  }
+
+  private void openStream() {
+    List<BlobWriteOption> writeOptions = Lists.newArrayList();
+
+    gcpProperties.encryptionKey().ifPresent(
+        (key) -> writeOptions.add(BlobWriteOption.encryptionKey(key)));
+    gcpProperties.userProject().ifPresent(
+        (userProject) -> writeOptions.add(BlobWriteOption.userProject(userProject)));
+
+    WriteChannel channel = storage.writer(BlobInfo.newBuilder(blobId).build(),
+        writeOptions.toArray(new BlobWriteOption[0]));
+
+    gcpProperties.channelWriteChunkSize().ifPresent(channel::setChunkSize);

Review comment:
       I feel like we shouldn't be validating on top of the channel.  Also, knowing a little bit about how the implementation works, this is more of a suggestion than an explicit set.  The channel has a min size it will use regardless of what you set.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767339675



##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileIO Implementation backed by Google Cloud Storage (GCS)
+ * <p>
+ * Locations follow the conventions used by
+ * {@link com.google.cloud.storage.BlobId#fromGsUtilUri(String) BlobId.fromGsUtilUri}
+ */
+public class GCSFileIO implements FileIO {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class);
+
+  private SerializableSupplier<Storage> storageSupplier;
+  private GCPProperties gcpProperties;
+  private transient Storage storage;
+  private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
+
+  /**
+   * No-arg constructor to load the FileIO dynamically.
+   * <p>
+   * All fields are initialized by calling {@link GCSFileIO#initialize(Map)} later.
+   */
+  public GCSFileIO() {
+  }
+
+  /**
+   * Constructor with custom storage supplier and GCP properties.
+   * <p>
+   * Calling {@link GCSFileIO#initialize(Map)} will overwrite information set in this constructor.
+   *
+   * @param storageSupplier storage supplier
+   * @param gcpProperties gcp properties
+   */
+  public GCSFileIO(SerializableSupplier<Storage> storageSupplier, GCPProperties gcpProperties) {
+    this.storageSupplier = storageSupplier;
+    this.gcpProperties = gcpProperties;
+  }
+
+  @Override
+  public InputFile newInputFile(String path) {
+    return new GCSInputFile(client(), BlobId.fromGsUtilUri(path), gcpProperties);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+    return new GCSOutputFile(client(), BlobId.fromGsUtilUri(path), gcpProperties);
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    // There is no specific contract about whether delete should fail
+    // and other FileIO providers ignore failure.  Log the failure for
+    // now as it is not a required operation for Iceberg.
+    if (!client().delete(BlobId.fromGsUtilUri(path))) {

Review comment:
       I see. I was curious in what condition would it return false instead of throwing. Based on the documentation, it says 
   
   ```
   boolean delete(BlobId blob)
   
   Deletes the requested blob.
   
   Example of deleting a blob.
   
   
    String bucketName = "my-unique-bucket";
    String blobName = "my-blob-name";
    BlobId blobId = BlobId.of(bucketName, blobName);
    boolean deleted = storage.delete(blobId);
    if (deleted) {
      // the blob was deleted
    } else {
      // the blob was not found
    }
    
   
   Returns:
       true if blob was deleted, false if it was not found
   ```
   
   So I think we can be a bit more explicit in the logging, something like "Fail to delete path, not found: %s"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767318223



##########
File path: build.gradle
##########
@@ -354,6 +354,30 @@ project(':iceberg-aws') {
   }
 }
 
+project(':iceberg-gcp') {
+  dependencies {
+    implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
+    api project(':iceberg-api')
+    implementation project(':iceberg-common')
+    implementation project(':iceberg-core')
+
+    implementation platform('com.google.cloud:libraries-bom:24.1.0')
+    implementation 'com.google.cloud:google-cloud-storage'
+
+    testImplementation 'com.google.cloud:google-cloud-nio'
+    testImplementation 'org.assertj:assertj-core'

Review comment:
       I'm fine with it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767057016



##########
File path: build.gradle
##########
@@ -354,6 +354,30 @@ project(':iceberg-aws') {
   }
 }
 
+project(':iceberg-gcp') {
+  dependencies {
+    implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
+    api project(':iceberg-api')
+    implementation project(':iceberg-common')
+    implementation project(':iceberg-core')
+
+    implementation platform('com.google.cloud:libraries-bom:24.1.0')
+    implementation 'com.google.cloud:google-cloud-storage'
+
+    testImplementation 'com.google.cloud:google-cloud-nio'
+    testImplementation 'org.assertj:assertj-core'

Review comment:
       Is it possible to add this as a testImplementation to all sub-projects (could be a follow up)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767067180



##########
File path: build.gradle
##########
@@ -354,6 +354,30 @@ project(':iceberg-aws') {
   }
 }
 
+project(':iceberg-gcp') {
+  dependencies {
+    implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
+    api project(':iceberg-api')
+    implementation project(':iceberg-common')
+    implementation project(':iceberg-core')
+
+    implementation platform('com.google.cloud:libraries-bom:24.1.0')
+    implementation 'com.google.cloud:google-cloud-storage'
+
+    testImplementation 'com.google.cloud:google-cloud-nio'
+    testImplementation 'org.assertj:assertj-core'

Review comment:
       There was a discussion about this perviously and I'm not sure where it landed, but some subprojects are using it now while others are not.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r768061026



##########
File path: build.gradle
##########
@@ -354,6 +354,30 @@ project(':iceberg-aws') {
   }
 }
 
+project(':iceberg-gcp') {
+  dependencies {
+    implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
+    api project(':iceberg-api')
+    implementation project(':iceberg-common')
+    implementation project(':iceberg-core')
+
+    implementation platform('com.google.cloud:libraries-bom:24.1.0')

Review comment:
       Good catch.  As for the bom, I'm not super clear on what all the bom provides, but I think it insures that all of the required dependencies for the google libraries stay in sync.  Given the set of dependencies required here, it might be best to adhere to the bom (unless we find significant conflict with the spark libs).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767040843



##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.api.client.util.Lists;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobWriteOption;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The GCSOutputStream leverages native streaming channels from the GCS API
+ * for streaming uploads. See <a href="https://cloud.google.com/storage/docs/streaming">Streaming Transfers</a>
+ */
+class GCSOutputStream extends PositionOutputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSOutputStream.class);
+
+  private final StackTraceElement[] createStack;
+  private final Storage storage;
+  private final BlobId blobId;
+  private final GCPProperties gcpProperties;
+
+  private OutputStream stream;
+
+  private long pos = 0;
+  private boolean closed = false;
+
+  GCSOutputStream(Storage storage, BlobId blobId, GCPProperties gcpProperties) throws IOException {
+    this.storage = storage;
+    this.blobId = blobId;
+    this.gcpProperties = gcpProperties;
+
+    createStack = Thread.currentThread().getStackTrace();
+
+    openStream();
+  }
+
+  @Override
+  public long getPos() {
+    return pos;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    stream.write(b);
+    pos += 1;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    stream.write(b, off, len);
+    pos += len;
+  }
+
+  private void openStream() {
+    List<BlobWriteOption> writeOptions = Lists.newArrayList();
+
+    gcpProperties.encryptionKey().ifPresent(
+        (key) -> writeOptions.add(BlobWriteOption.encryptionKey(key)));

Review comment:
       nit: redundant brackets around `(key)` and `(userProject)`

##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.api.client.util.Lists;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobWriteOption;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The GCSOutputStream leverages native streaming channels from the GCS API
+ * for streaming uploads. See <a href="https://cloud.google.com/storage/docs/streaming">Streaming Transfers</a>
+ */
+class GCSOutputStream extends PositionOutputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSOutputStream.class);
+
+  private final StackTraceElement[] createStack;
+  private final Storage storage;
+  private final BlobId blobId;
+  private final GCPProperties gcpProperties;
+
+  private OutputStream stream;
+
+  private long pos = 0;
+  private boolean closed = false;
+
+  GCSOutputStream(Storage storage, BlobId blobId, GCPProperties gcpProperties) throws IOException {
+    this.storage = storage;
+    this.blobId = blobId;
+    this.gcpProperties = gcpProperties;
+
+    createStack = Thread.currentThread().getStackTrace();
+
+    openStream();
+  }
+
+  @Override
+  public long getPos() {
+    return pos;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    stream.write(b);
+    pos += 1;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    stream.write(b, off, len);
+    pos += len;
+  }
+
+  private void openStream() {
+    List<BlobWriteOption> writeOptions = Lists.newArrayList();
+
+    gcpProperties.encryptionKey().ifPresent(
+        (key) -> writeOptions.add(BlobWriteOption.encryptionKey(key)));
+    gcpProperties.userProject().ifPresent(
+        (userProject) -> writeOptions.add(BlobWriteOption.userProject(userProject)));
+
+    WriteChannel channel = storage.writer(BlobInfo.newBuilder(blobId).build(),
+        writeOptions.toArray(new BlobWriteOption[0]));
+
+    gcpProperties.channelWriteChunkSize().ifPresent(channel::setChunkSize);

Review comment:
       can we add some basic checks for configuration such as check size > 0 if present.

##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileIO Implementation backed by Google Cloud Storage (GCS)
+ * <p>
+ * Locations follow the conventions used by
+ * {@link com.google.cloud.storage.BlobId#fromGsUtilUri(String) BlobId.fromGsUtilUri}

Review comment:
       could you add an example or GCS link of the accepted location format? https://googleapis.dev/java/google-cloud-storage/latest/com/google/cloud/storage/BlobId.html#fromGsUtilUri-java.lang.String- does not really provide any information.

##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileIO Implementation backed by Google Cloud Storage (GCS)
+ * <p>
+ * Locations follow the conventions used by
+ * {@link com.google.cloud.storage.BlobId#fromGsUtilUri(String) BlobId.fromGsUtilUri}
+ */
+public class GCSFileIO implements FileIO {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class);
+
+  private SerializableSupplier<Storage> storageSupplier;
+  private GCPProperties gcpProperties;
+  private transient Storage storage;
+  private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
+
+  /**
+   * No-arg constructor to load the FileIO dynamically.
+   * <p>
+   * All fields are initialized by calling {@link GCSFileIO#initialize(Map)} later.
+   */
+  public GCSFileIO() {
+  }
+
+  /**
+   * Constructor with custom storage supplier and GCP properties.
+   * <p>
+   * Calling {@link GCSFileIO#initialize(Map)} will overwrite information set in this constructor.
+   *
+   * @param storageSupplier storage supplier
+   * @param gcpProperties gcp properties
+   */
+  public GCSFileIO(SerializableSupplier<Storage> storageSupplier, GCPProperties gcpProperties) {
+    this.storageSupplier = storageSupplier;
+    this.gcpProperties = gcpProperties;
+  }
+
+  @Override
+  public InputFile newInputFile(String path) {
+    return new GCSInputFile(client(), BlobId.fromGsUtilUri(path), gcpProperties);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+    return new GCSOutputFile(client(), BlobId.fromGsUtilUri(path), gcpProperties);
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    // There is no specific contract about whether delete should fail
+    // and other FileIO providers ignore failure.  Log the failure for
+    // now as it is not a required operation for Iceberg.
+    if (!client().delete(BlobId.fromGsUtilUri(path))) {

Review comment:
       under what condition would it return false instead of throwing an exception?

##########
File path: gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Random;
+import java.util.stream.StreamSupport;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.junit.Before;
+import org.junit.Test;
+
+import static java.lang.String.format;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertFalse;
+
+public class GCSFileIOTest {
+  private static final String TEST_BUCKET = "TEST_BUCKET";
+  private final Random random = new Random(1);
+
+  Storage storage = LocalStorageHelper.getOptions().getService();

Review comment:
       variables can be private? also for other similar places in the tests

##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class GCSOutputFile extends BaseGCSFile implements OutputFile {
+  public static GCSOutputFile fromLocation(String location, Storage storage, GCPProperties gcpProperties) {

Review comment:
       I don't see this (and the same one in input file) used anywhere, what's its purpose?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767067029



##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class GCSOutputFile extends BaseGCSFile implements OutputFile {
+  public static GCSOutputFile fromLocation(String location, Storage storage, GCPProperties gcpProperties) {

Review comment:
       Yeah, sorry, those should be referenced from GCSFileIO vs using `new`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767059416



##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileIO Implementation backed by Google Cloud Storage (GCS)
+ * <p>
+ * Locations follow the conventions used by
+ * {@link com.google.cloud.storage.BlobId#fromGsUtilUri(String) BlobId.fromGsUtilUri}
+ */
+public class GCSFileIO implements FileIO {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class);
+
+  private SerializableSupplier<Storage> storageSupplier;
+  private GCPProperties gcpProperties;
+  private transient Storage storage;
+  private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
+
+  /**
+   * No-arg constructor to load the FileIO dynamically.
+   * <p>
+   * All fields are initialized by calling {@link GCSFileIO#initialize(Map)} later.
+   */
+  public GCSFileIO() {
+  }
+
+  /**
+   * Constructor with custom storage supplier and GCP properties.
+   * <p>
+   * Calling {@link GCSFileIO#initialize(Map)} will overwrite information set in this constructor.
+   *
+   * @param storageSupplier storage supplier
+   * @param gcpProperties gcp properties
+   */
+  public GCSFileIO(SerializableSupplier<Storage> storageSupplier, GCPProperties gcpProperties) {
+    this.storageSupplier = storageSupplier;
+    this.gcpProperties = gcpProperties;
+  }
+
+  @Override
+  public InputFile newInputFile(String path) {
+    return new GCSInputFile(client(), BlobId.fromGsUtilUri(path), gcpProperties);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+    return new GCSOutputFile(client(), BlobId.fromGsUtilUri(path), gcpProperties);
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    // There is no specific contract about whether delete should fail
+    // and other FileIO providers ignore failure.  Log the failure for
+    // now as it is not a required operation for Iceberg.
+    if (!client().delete(BlobId.fromGsUtilUri(path))) {

Review comment:
       Throwing an exception would indicate some sort of request/transport error.  This is actually similar to how S3FileIO works in that failure to delete does not throw.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767059416



##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileIO Implementation backed by Google Cloud Storage (GCS)
+ * <p>
+ * Locations follow the conventions used by
+ * {@link com.google.cloud.storage.BlobId#fromGsUtilUri(String) BlobId.fromGsUtilUri}
+ */
+public class GCSFileIO implements FileIO {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSFileIO.class);
+
+  private SerializableSupplier<Storage> storageSupplier;
+  private GCPProperties gcpProperties;
+  private transient Storage storage;
+  private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
+
+  /**
+   * No-arg constructor to load the FileIO dynamically.
+   * <p>
+   * All fields are initialized by calling {@link GCSFileIO#initialize(Map)} later.
+   */
+  public GCSFileIO() {
+  }
+
+  /**
+   * Constructor with custom storage supplier and GCP properties.
+   * <p>
+   * Calling {@link GCSFileIO#initialize(Map)} will overwrite information set in this constructor.
+   *
+   * @param storageSupplier storage supplier
+   * @param gcpProperties gcp properties
+   */
+  public GCSFileIO(SerializableSupplier<Storage> storageSupplier, GCPProperties gcpProperties) {
+    this.storageSupplier = storageSupplier;
+    this.gcpProperties = gcpProperties;
+  }
+
+  @Override
+  public InputFile newInputFile(String path) {
+    return new GCSInputFile(client(), BlobId.fromGsUtilUri(path), gcpProperties);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+    return new GCSOutputFile(client(), BlobId.fromGsUtilUri(path), gcpProperties);
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    // There is no specific contract about whether delete should fail
+    // and other FileIO providers ignore failure.  Log the failure for
+    // now as it is not a required operation for Iceberg.
+    if (!client().delete(BlobId.fromGsUtilUri(path))) {

Review comment:
       Throwing an exception would indicate some sort of request/transport error.  This is actually similar to how S3FileIO works in that failure to delete does not throw.  

##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.api.client.util.Lists;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobWriteOption;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The GCSOutputStream leverages native streaming channels from the GCS API
+ * for streaming uploads. See <a href="https://cloud.google.com/storage/docs/streaming">Streaming Transfers</a>
+ */
+class GCSOutputStream extends PositionOutputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSOutputStream.class);
+
+  private final StackTraceElement[] createStack;
+  private final Storage storage;
+  private final BlobId blobId;
+  private final GCPProperties gcpProperties;
+
+  private OutputStream stream;
+
+  private long pos = 0;
+  private boolean closed = false;
+
+  GCSOutputStream(Storage storage, BlobId blobId, GCPProperties gcpProperties) throws IOException {
+    this.storage = storage;
+    this.blobId = blobId;
+    this.gcpProperties = gcpProperties;
+
+    createStack = Thread.currentThread().getStackTrace();
+
+    openStream();
+  }
+
+  @Override
+  public long getPos() {
+    return pos;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    stream.write(b);
+    pos += 1;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    stream.write(b, off, len);
+    pos += len;
+  }
+
+  private void openStream() {
+    List<BlobWriteOption> writeOptions = Lists.newArrayList();
+
+    gcpProperties.encryptionKey().ifPresent(
+        (key) -> writeOptions.add(BlobWriteOption.encryptionKey(key)));
+    gcpProperties.userProject().ifPresent(
+        (userProject) -> writeOptions.add(BlobWriteOption.userProject(userProject)));
+
+    WriteChannel channel = storage.writer(BlobInfo.newBuilder(blobId).build(),
+        writeOptions.toArray(new BlobWriteOption[0]));
+
+    gcpProperties.channelWriteChunkSize().ifPresent(channel::setChunkSize);

Review comment:
       I feel like we shouldn't be validating on top of the channel.  Also, knowing a little bit about how the implementation works, this is more of a suggestion than an explicit set.  The channel has a min size it will use regardless of what you set.  

##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class GCSOutputFile extends BaseGCSFile implements OutputFile {
+  public static GCSOutputFile fromLocation(String location, Storage storage, GCPProperties gcpProperties) {

Review comment:
       Yeah, sorry, those should be referenced from GCSFileIO vs using `new`

##########
File path: build.gradle
##########
@@ -354,6 +354,30 @@ project(':iceberg-aws') {
   }
 }
 
+project(':iceberg-gcp') {
+  dependencies {
+    implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
+    api project(':iceberg-api')
+    implementation project(':iceberg-common')
+    implementation project(':iceberg-core')
+
+    implementation platform('com.google.cloud:libraries-bom:24.1.0')
+    implementation 'com.google.cloud:google-cloud-storage'
+
+    testImplementation 'com.google.cloud:google-cloud-nio'
+    testImplementation 'org.assertj:assertj-core'

Review comment:
       There was a discussion about this perviously and I'm not sure where it landed, but some subprojects are using it now while others are not.

##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FileIO Implementation backed by Google Cloud Storage (GCS)
+ * <p>
+ * Locations follow the conventions used by
+ * {@link com.google.cloud.storage.BlobId#fromGsUtilUri(String) BlobId.fromGsUtilUri}

Review comment:
       I added a new link, but there doesn't appear to be (or at least I could find) a canonical reference for this format.  Updated docs should be more clear.

##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.api.client.util.Lists;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobWriteOption;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The GCSOutputStream leverages native streaming channels from the GCS API
+ * for streaming uploads. See <a href="https://cloud.google.com/storage/docs/streaming">Streaming Transfers</a>
+ */
+class GCSOutputStream extends PositionOutputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSOutputStream.class);
+
+  private final StackTraceElement[] createStack;
+  private final Storage storage;
+  private final BlobId blobId;
+  private final GCPProperties gcpProperties;
+
+  private OutputStream stream;
+
+  private long pos = 0;
+  private boolean closed = false;
+
+  GCSOutputStream(Storage storage, BlobId blobId, GCPProperties gcpProperties) throws IOException {
+    this.storage = storage;
+    this.blobId = blobId;
+    this.gcpProperties = gcpProperties;
+
+    createStack = Thread.currentThread().getStackTrace();
+
+    openStream();
+  }
+
+  @Override
+  public long getPos() {
+    return pos;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    stream.write(b);
+    pos += 1;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    stream.write(b, off, len);
+    pos += len;
+  }
+
+  private void openStream() {
+    List<BlobWriteOption> writeOptions = Lists.newArrayList();
+
+    gcpProperties.encryptionKey().ifPresent(
+        (key) -> writeOptions.add(BlobWriteOption.encryptionKey(key)));
+    gcpProperties.userProject().ifPresent(
+        (userProject) -> writeOptions.add(BlobWriteOption.userProject(userProject)));
+
+    WriteChannel channel = storage.writer(BlobInfo.newBuilder(blobId).build(),
+        writeOptions.toArray(new BlobWriteOption[0]));
+
+    gcpProperties.channelWriteChunkSize().ifPresent(channel::setChunkSize);

Review comment:
       I guess I should clarify that it valid to use a negative number to effectively say "use the min buffer size allowed".  While that may be bad practice, I think adding additional checks here is unnecessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3711: Add FileIO implementation for Google Cloud Storage

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3711:
URL: https://github.com/apache/iceberg/pull/3711#discussion_r767339259



##########
File path: gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputStream.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.api.client.util.Lists;
+import com.google.cloud.WriteChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobWriteOption;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The GCSOutputStream leverages native streaming channels from the GCS API
+ * for streaming uploads. See <a href="https://cloud.google.com/storage/docs/streaming">Streaming Transfers</a>
+ */
+class GCSOutputStream extends PositionOutputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(GCSOutputStream.class);
+
+  private final StackTraceElement[] createStack;
+  private final Storage storage;
+  private final BlobId blobId;
+  private final GCPProperties gcpProperties;
+
+  private OutputStream stream;
+
+  private long pos = 0;
+  private boolean closed = false;
+
+  GCSOutputStream(Storage storage, BlobId blobId, GCPProperties gcpProperties) throws IOException {
+    this.storage = storage;
+    this.blobId = blobId;
+    this.gcpProperties = gcpProperties;
+
+    createStack = Thread.currentThread().getStackTrace();
+
+    openStream();
+  }
+
+  @Override
+  public long getPos() {
+    return pos;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    stream.write(b);
+    pos += 1;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    stream.write(b, off, len);
+    pos += len;
+  }
+
+  private void openStream() {
+    List<BlobWriteOption> writeOptions = Lists.newArrayList();
+
+    gcpProperties.encryptionKey().ifPresent(
+        (key) -> writeOptions.add(BlobWriteOption.encryptionKey(key)));
+    gcpProperties.userProject().ifPresent(
+        (userProject) -> writeOptions.add(BlobWriteOption.userProject(userProject)));
+
+    WriteChannel channel = storage.writer(BlobInfo.newBuilder(blobId).build(),
+        writeOptions.toArray(new BlobWriteOption[0]));
+
+    gcpProperties.channelWriteChunkSize().ifPresent(channel::setChunkSize);

Review comment:
       I see, thanks for the explanation!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org