You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "ZENOTME (via GitHub)" <gi...@apache.org> on 2024/04/24 17:16:18 UTC

[PR] feat: support append data file and add e2e test [iceberg-rust]

ZENOTME opened a new pull request, #349:
URL: https://github.com/apache/iceberg-rust/pull/349

   This PR is complete https://github.com/apache/iceberg-rust/issues/345. 
   1. It adds the MergeSnapshotAction to commit the data file 
   2. It init the e2e test for write and read


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "ZENOTME (via GitHub)" <gi...@apache.org>.
ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1580444775


##########
crates/iceberg/src/transaction.rs:
##########
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the table.
+pub struct FastAppendAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema: Schema,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: PartitionSpec,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: String,
+    manifest_id: i64,
+
+    appended_data_files: Vec<DataFile>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: PartitionSpec,
+        key_metadata: Vec<u8>,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema,
+            schema_id,
+            format_version,
+            partition_spec,
+            key_metadata,
+            commit_uuid,
+            manifest_id: 0,
+            appended_data_files: vec![],
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_file: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.appended_data_files.extend(data_file);
+        Ok(self)
+    }
+
+    fn generate_manifest_file_path(&mut self) -> String {
+        let manifest_id = self.manifest_id;
+        self.manifest_id += 1;
+        format!(
+            "{}/{}/{}-m{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            &self.commit_uuid,
+            manifest_id,
+            DataFileFormat::Avro
+        )
+    }
+
+    async fn manifest_from_parent_snapshot(&self) -> Result<Vec<ManifestFile>> {
+        if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+            let manifest_list = snapshot
+                .load_manifest_list(self.tx.table.file_io(), &self.tx.table.metadata_ref())
+                .await?;
+            let mut manifest_files = Vec::with_capacity(manifest_list.entries().len());
+            for entry in manifest_list.entries() {
+                // From: https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+                // Why we need this?
+                if entry.added_snapshot_id == self.snapshot_id {
+                    continue;
+                }
+                let manifest = entry.load_manifest(self.tx.table.file_io()).await?;
+                // Skip manifest with all delete entries.
+                if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+                    continue;
+                }
+                manifest_files.push(entry.clone());
+            }
+            Ok(manifest_files)
+        } else {
+            Ok(vec![])
+        }
+    }
+
+    // Write manifest file for added data files and return the ManifestFile for ManifestList.
+    async fn manifest_for_data_file(&mut self) -> Result<ManifestFile> {
+        let appended_data_files = std::mem::take(&mut self.appended_data_files);
+        let manifest_entries = appended_data_files
+            .into_iter()
+            .map(|data_file| {
+                ManifestEntry::builder()
+                    .status(crate::spec::ManifestStatus::Added)
+                    .snapshot_id(self.snapshot_id)
+                    .data_file(data_file)
+                    .build()
+            })
+            .collect();
+        let manifest_meta = ManifestMetadata::builder()
+            .schema(self.schema.clone())
+            .schema_id(self.schema_id)
+            .format_version(self.format_version)
+            .partition_spec(self.partition_spec.clone())
+            .content(crate::spec::ManifestContentType::Data)
+            .build();
+        let manifest = Manifest::new(manifest_meta, manifest_entries);
+        let writer = ManifestWriter::new(
+            self.tx
+                .table
+                .file_io()
+                .new_output(self.generate_manifest_file_path())?,
+            self.snapshot_id,
+            self.key_metadata.clone(),
+        );
+        writer.write(manifest).await
+    }
+
+    fn summary(&self) -> Summary {
+        Summary {
+            operation: crate::spec::Operation::Append,
+            other: HashMap::new(),
+        }
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(mut self) -> Result<Transaction<'a>> {
+        let summary = self.summary();
+        let manifest = self.manifest_for_data_file().await?;
+        let existing_manifest_files = self.manifest_from_parent_snapshot().await?;
+
+        let snapshot_produce_action = SnapshotProduceAction::new(
+            self.tx,
+            self.snapshot_id,
+            self.parent_snapshot_id,
+            self.schema_id,
+            self.format_version,
+            self.commit_uuid,
+        )?;
+
+        snapshot_produce_action
+            .apply(
+                vec![manifest]
+                    .into_iter()
+                    .chain(existing_manifest_files.into_iter()),
+                summary,
+            )
+            .await
+    }
+}
+
+struct SnapshotProduceAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema_id: i32,
+    format_version: FormatVersion,
+
+    commit_uuid: String,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        parent_snapshot_id: Option<i64>,
+        schema_id: i32,
+        format_version: FormatVersion,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema_id,
+            format_version,
+            commit_uuid,
+        })
+    }
+
+    fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String {
+        format!(
+            "{}/{}/snap-{}-{}-{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            self.snapshot_id,
+            next_seq_num,
+            self.commit_uuid,
+            DataFileFormat::Avro
+        )
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(
+        mut self,
+        manifest_files: impl IntoIterator<Item = ManifestFile>,
+        summary: Summary,
+    ) -> Result<Transaction<'a>> {
+        let next_seq_num = if self.format_version as u8 > 1u8 {
+            self.tx.table.metadata().last_sequence_number() + 1
+        } else {
+            INITIAL_SEQUENCE_NUMBER
+        };
+        let commit_ts = chrono::Utc::now().timestamp_millis();
+        let manifest_list_path = self.generate_manifest_list_file_path(next_seq_num);
+
+        let mut manifest_list_writer = ManifestListWriter::v2(
+            self.tx
+                .table
+                .file_io()
+                .new_output(manifest_list_path.clone())?,
+            self.snapshot_id,
+            // # TODO
+            // Should we use `0` here for default parent snapshot id?
+            self.parent_snapshot_id.unwrap_or_default(),
+            next_seq_num,
+        );
+        manifest_list_writer.add_manifests(manifest_files.into_iter())?;
+        manifest_list_writer.close().await?;
+
+        let new_snapshot = Snapshot::builder()
+            .with_manifest_list(manifest_list_path)
+            .with_snapshot_id(self.snapshot_id)
+            .with_parent_snapshot_id(self.parent_snapshot_id)
+            .with_sequence_number(next_seq_num)
+            .with_summary(summary)
+            .with_schema_id(self.schema_id)
+            .with_timestamp_ms(commit_ts)
+            .build();
+
+        let new_snapshot_id = new_snapshot.snapshot_id();
+        self.tx.append_updates(vec![
+            TableUpdate::AddSnapshot {
+                snapshot: new_snapshot,
+            },
+            TableUpdate::SetSnapshotRef {
+                ref_name: MAIN_BRANCH.to_string(),
+                reference: SnapshotReference::new(
+                    new_snapshot_id,
+                    SnapshotRetention::branch(None, None, None),
+                ),
+            },
+        ])?;
+        self.tx.append_requirements(vec![
+            TableRequirement::UuidMatch {
+                uuid: self.tx.table.metadata().uuid(),
+            },
+            TableRequirement::RefSnapshotIdMatch {
+                r#ref: MAIN_BRANCH.to_string(),
+                snapshot_id: self.parent_snapshot_id,

Review Comment:
   > This will work for now but might get problematic later on. Just a heads up.
   
   I'm not sure whether my understanding of "problematic"  is correct. Did you mean that this problem is that if we apply two append actions, like
   ```
   1: TableRequirement::UuidMatch
       TableRequirement::RefSnapshotIdMatch (parent_snapshot_id)
   2. TableRequirement::UuidMatch
      TableRequirement::RefSnapshotIdMatch (parent_snapshot_id)
   ```
   The requirement of last one will fail, because the previous update will change snapshot id so that the last validation will fail. So when we apply, we should return a tx can reflect the update of snapshot so that we can stack the update together.🤔



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1582058099


##########
crates/e2e_test/tests/append_data_file_test.rs:
##########
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+    DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+    _docker_compose: DockerCompose,
+    rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {

Review Comment:
   possible refactor (if we'll use e2e_test for all integrations e.g. hive, glue, etc.):
   - relocate `set_test_fixture` into /test_utils/
   - run & expose all services (or allow for configurion param (enum) which services to set up



##########
crates/e2e_test/testdata/docker-compose.yaml:
##########
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:

Review Comment:
   I'll guess once this is merged when can reuse this and extend with images for `Glue` and `HiveMetastore` as well - this would remove duplicates from the specific catalog impl crates and the datafusion/integration crate and avoids having each crate run & define same testing infr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579368639


##########
crates/e2e_test/testdata/docker-compose.yaml:
##########
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+    image: tabulario/iceberg-rest:0.10.0
+    environment:
+      - AWS_ACCESS_KEY_ID=admin
+      - AWS_SECRET_ACCESS_KEY=password
+      - AWS_REGION=us-east-1
+      - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog

Review Comment:
   There is a typo in this. I think this line can go.
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "ZENOTME (via GitHub)" <gi...@apache.org>.
ZENOTME commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2108162712

   Hi, I have tried to fix this PR. Some things may not be fixed well now:
   1. https://github.com/apache/iceberg-rust/pull/349#discussion_r1580444775
     I'm not sure whether my understanding is correct 
   2. todo, we can do them in later PR:
   - https://github.com/apache/iceberg-rust/pull/349#discussion_r1580446821
   - https://github.com/apache/iceberg-rust/pull/349#discussion_r1579420662
   3. https://github.com/apache/iceberg-rust/pull/349#discussion_r1580571634
   Please let me know if there are other things I miss and need to fix. cc @Fokko 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "ZENOTME (via GitHub)" <gi...@apache.org>.
ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1595472104


##########
crates/e2e_test/tests/append_data_file_test.rs:
##########
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+    DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+    _docker_compose: DockerCompose,
+    rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {

Review Comment:
   LGTM. We can do this later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579369755


##########
crates/e2e_test/testdata/docker-compose.yaml:
##########
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+    image: tabulario/iceberg-rest:0.10.0
+    environment:
+      - AWS_ACCESS_KEY_ID=admin
+      - AWS_SECRET_ACCESS_KEY=password
+      - AWS_REGION=us-east-1
+      - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
+      - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+      - CATALOG_WAREHOUSE=s3://icebergdata/demo
+      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+      - CATALOG_S3_ENDPOINT=http://minio:9000
+    depends_on:
+      - minio
+    links:
+      - minio:icebergdata.minio
+    expose:
+      - 8181
+
+  minio:

Review Comment:
   It doesn't boot on my end:
   
   ```
   ➜  iceberg-rust git:(tx_append) docker logs -f d7a12d1f9d30
   Formatting 1st pool, 1 set(s), 1 drives per set.
   WARNING: Host local has more than 0 drives of set. A host failure will result in data becoming unavailable.
   MinIO Object Storage Server
   Copyright: 2015-2024 MinIO, Inc.
   License: GNU AGPLv3 <https://www.gnu.org/licenses/agpl-3.0.html>
   Version: RELEASE.2024-03-07T00-43-48Z (go1.21.8 linux/arm64)
   
   Status:         1 Online, 0 Offline. 
   S3-API: http://172.20.0.2:9000  http://127.0.0.1:9000     
   Console: http://172.20.0.2:9001 http://127.0.0.1:9001   
   
   Documentation: https://min.io/docs/minio/linux/index.html
   Warning: The standard parity is set to 0. This can lead to data loss.
   
   API: ListObjectsV2(bucket=icebergdata)
   Time: 12:11:34 UTC 04/25/2024
   DeploymentID: 0d2c88aa-2393-4c17-a28c-560e6cfe4b9b
   RequestID: 17C984C03C8E1AFA
   RemoteHost: 172.20.0.3
   Host: minio:9000
   UserAgent: MinIO (linux; arm64) minio-go/v7.0.67 mc/RELEASE.2024-03-07T00-31-49Z
   Error: volume not found (cmd.StorageErr)
          7: internal/logger/logonce.go:118:logger.(*logOnceType).logOnceIf()
          6: internal/logger/logonce.go:149:logger.LogOnceIf()
          5: cmd/erasure-server-pool.go:1516:cmd.(*erasureServerPools).ListObjects()
          4: cmd/erasure-server-pool.go:1275:cmd.(*erasureServerPools).ListObjectsV2()
          3: cmd/bucket-listobjects-handlers.go:210:cmd.objectAPIHandlers.listObjectsV2Handler()
          2: cmd/bucket-listobjects-handlers.go:156:cmd.objectAPIHandlers.ListObjectsV2Handler()
          1: net/http/server.go:2136:http.HandlerFunc.ServeHTTP()
   
    You are running an older version of MinIO released 1 month before the latest release 
    Update: Run `mc admin update ALIAS` 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579449633


##########
crates/iceberg/src/transaction.rs:
##########
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the table.
+pub struct FastAppendAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema: Schema,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: PartitionSpec,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: String,
+    manifest_id: i64,
+
+    appended_data_files: Vec<DataFile>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: PartitionSpec,
+        key_metadata: Vec<u8>,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema,
+            schema_id,
+            format_version,
+            partition_spec,
+            key_metadata,
+            commit_uuid,
+            manifest_id: 0,
+            appended_data_files: vec![],
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_file: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.appended_data_files.extend(data_file);
+        Ok(self)
+    }
+
+    fn generate_manifest_file_path(&mut self) -> String {
+        let manifest_id = self.manifest_id;
+        self.manifest_id += 1;
+        format!(
+            "{}/{}/{}-m{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            &self.commit_uuid,
+            manifest_id,
+            DataFileFormat::Avro
+        )
+    }
+
+    async fn manifest_from_parent_snapshot(&self) -> Result<Vec<ManifestFile>> {
+        if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+            let manifest_list = snapshot
+                .load_manifest_list(self.tx.table.file_io(), &self.tx.table.metadata_ref())
+                .await?;
+            let mut manifest_files = Vec::with_capacity(manifest_list.entries().len());
+            for entry in manifest_list.entries() {
+                // From: https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+                // Why we need this?
+                if entry.added_snapshot_id == self.snapshot_id {
+                    continue;
+                }
+                let manifest = entry.load_manifest(self.tx.table.file_io()).await?;
+                // Skip manifest with all delete entries.
+                if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+                    continue;
+                }
+                manifest_files.push(entry.clone());
+            }
+            Ok(manifest_files)
+        } else {
+            Ok(vec![])
+        }
+    }
+
+    // Write manifest file for added data files and return the ManifestFile for ManifestList.
+    async fn manifest_for_data_file(&mut self) -> Result<ManifestFile> {
+        let appended_data_files = std::mem::take(&mut self.appended_data_files);
+        let manifest_entries = appended_data_files
+            .into_iter()
+            .map(|data_file| {
+                ManifestEntry::builder()
+                    .status(crate::spec::ManifestStatus::Added)
+                    .snapshot_id(self.snapshot_id)
+                    .data_file(data_file)
+                    .build()
+            })
+            .collect();
+        let manifest_meta = ManifestMetadata::builder()
+            .schema(self.schema.clone())
+            .schema_id(self.schema_id)
+            .format_version(self.format_version)
+            .partition_spec(self.partition_spec.clone())
+            .content(crate::spec::ManifestContentType::Data)
+            .build();
+        let manifest = Manifest::new(manifest_meta, manifest_entries);
+        let writer = ManifestWriter::new(
+            self.tx
+                .table
+                .file_io()
+                .new_output(self.generate_manifest_file_path())?,
+            self.snapshot_id,
+            self.key_metadata.clone(),
+        );
+        writer.write(manifest).await
+    }
+
+    fn summary(&self) -> Summary {
+        Summary {
+            operation: crate::spec::Operation::Append,
+            other: HashMap::new(),
+        }
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(mut self) -> Result<Transaction<'a>> {
+        let summary = self.summary();
+        let manifest = self.manifest_for_data_file().await?;
+        let existing_manifest_files = self.manifest_from_parent_snapshot().await?;
+
+        let snapshot_produce_action = SnapshotProduceAction::new(
+            self.tx,
+            self.snapshot_id,
+            self.parent_snapshot_id,
+            self.schema_id,
+            self.format_version,
+            self.commit_uuid,
+        )?;
+
+        snapshot_produce_action
+            .apply(
+                vec![manifest]
+                    .into_iter()
+                    .chain(existing_manifest_files.into_iter()),
+                summary,
+            )
+            .await
+    }
+}
+
+struct SnapshotProduceAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema_id: i32,
+    format_version: FormatVersion,
+
+    commit_uuid: String,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        parent_snapshot_id: Option<i64>,
+        schema_id: i32,
+        format_version: FormatVersion,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema_id,
+            format_version,
+            commit_uuid,
+        })
+    }
+
+    fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String {
+        format!(
+            "{}/{}/snap-{}-{}-{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            self.snapshot_id,
+            next_seq_num,
+            self.commit_uuid,
+            DataFileFormat::Avro
+        )
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(
+        mut self,
+        manifest_files: impl IntoIterator<Item = ManifestFile>,
+        summary: Summary,
+    ) -> Result<Transaction<'a>> {
+        let next_seq_num = if self.format_version as u8 > 1u8 {
+            self.tx.table.metadata().last_sequence_number() + 1
+        } else {
+            INITIAL_SEQUENCE_NUMBER
+        };
+        let commit_ts = chrono::Utc::now().timestamp_millis();
+        let manifest_list_path = self.generate_manifest_list_file_path(next_seq_num);
+
+        let mut manifest_list_writer = ManifestListWriter::v2(
+            self.tx
+                .table
+                .file_io()
+                .new_output(manifest_list_path.clone())?,
+            self.snapshot_id,
+            // # TODO
+            // Should we use `0` here for default parent snapshot id?
+            self.parent_snapshot_id.unwrap_or_default(),
+            next_seq_num,
+        );
+        manifest_list_writer.add_manifests(manifest_files.into_iter())?;
+        manifest_list_writer.close().await?;
+
+        let new_snapshot = Snapshot::builder()
+            .with_manifest_list(manifest_list_path)
+            .with_snapshot_id(self.snapshot_id)
+            .with_parent_snapshot_id(self.parent_snapshot_id)
+            .with_sequence_number(next_seq_num)
+            .with_summary(summary)
+            .with_schema_id(self.schema_id)
+            .with_timestamp_ms(commit_ts)
+            .build();
+
+        let new_snapshot_id = new_snapshot.snapshot_id();
+        self.tx.append_updates(vec![
+            TableUpdate::AddSnapshot {
+                snapshot: new_snapshot,
+            },
+            TableUpdate::SetSnapshotRef {
+                ref_name: MAIN_BRANCH.to_string(),
+                reference: SnapshotReference::new(
+                    new_snapshot_id,
+                    SnapshotRetention::branch(None, None, None),
+                ),
+            },
+        ])?;
+        self.tx.append_requirements(vec![
+            TableRequirement::UuidMatch {
+                uuid: self.tx.table.metadata().uuid(),
+            },
+            TableRequirement::RefSnapshotIdMatch {
+                r#ref: MAIN_BRANCH.to_string(),
+                snapshot_id: self.parent_snapshot_id,

Review Comment:
   This will work for now but might get problematic later on. Just a heads up.
   
   An important concept for Iceberg is to stack snapshots in a single commit. For example, now with append being added in this PR, we can easily add support for truncate. This would be a delete operation where all the data is being dropped, and then just an append.



##########
crates/iceberg/src/transaction.rs:
##########
@@ -352,6 +682,83 @@ mod tests {
         );
     }
 
+    #[tokio::test]
+    async fn test_merge_snapshot_actio() {

Review Comment:
   ```suggestion
       async fn test_merge_snapshot_action() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579004695


##########
crates/iceberg/src/io.rs:
##########
@@ -368,6 +368,9 @@ impl Storage {
         new_props.insert("root".to_string(), DEFAULT_ROOT_PATH.to_string());
 
         match scheme {
+            Scheme::Memory => Ok(Self::LocalFs {

Review Comment:
   I try to avoid adding code just for the sake of testing :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "ZENOTME (via GitHub)" <gi...@apache.org>.
ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1598744515


##########
crates/iceberg/src/transaction.rs:
##########
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the table.
+pub struct FastAppendAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema: Schema,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: PartitionSpec,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: String,
+    manifest_id: i64,
+
+    appended_data_files: Vec<DataFile>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: PartitionSpec,
+        key_metadata: Vec<u8>,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema,
+            schema_id,
+            format_version,
+            partition_spec,
+            key_metadata,
+            commit_uuid,
+            manifest_id: 0,
+            appended_data_files: vec![],
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_file: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.appended_data_files.extend(data_file);
+        Ok(self)
+    }
+
+    fn generate_manifest_file_path(&mut self) -> String {
+        let manifest_id = self.manifest_id;
+        self.manifest_id += 1;
+        format!(
+            "{}/{}/{}-m{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            &self.commit_uuid,
+            manifest_id,
+            DataFileFormat::Avro
+        )
+    }
+
+    async fn manifest_from_parent_snapshot(&self) -> Result<Vec<ManifestFile>> {
+        if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+            let manifest_list = snapshot
+                .load_manifest_list(self.tx.table.file_io(), &self.tx.table.metadata_ref())
+                .await?;
+            let mut manifest_files = Vec::with_capacity(manifest_list.entries().len());
+            for entry in manifest_list.entries() {
+                // From: https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+                // Why we need this?
+                if entry.added_snapshot_id == self.snapshot_id {
+                    continue;
+                }
+                let manifest = entry.load_manifest(self.tx.table.file_io()).await?;
+                // Skip manifest with all delete entries.
+                if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+                    continue;
+                }
+                manifest_files.push(entry.clone());
+            }
+            Ok(manifest_files)
+        } else {
+            Ok(vec![])
+        }
+    }
+
+    // Write manifest file for added data files and return the ManifestFile for ManifestList.
+    async fn manifest_for_data_file(&mut self) -> Result<ManifestFile> {
+        let appended_data_files = std::mem::take(&mut self.appended_data_files);
+        let manifest_entries = appended_data_files
+            .into_iter()
+            .map(|data_file| {
+                ManifestEntry::builder()
+                    .status(crate::spec::ManifestStatus::Added)
+                    .snapshot_id(self.snapshot_id)
+                    .data_file(data_file)
+                    .build()
+            })
+            .collect();
+        let manifest_meta = ManifestMetadata::builder()
+            .schema(self.schema.clone())
+            .schema_id(self.schema_id)
+            .format_version(self.format_version)
+            .partition_spec(self.partition_spec.clone())
+            .content(crate::spec::ManifestContentType::Data)
+            .build();
+        let manifest = Manifest::new(manifest_meta, manifest_entries);
+        let writer = ManifestWriter::new(
+            self.tx
+                .table
+                .file_io()
+                .new_output(self.generate_manifest_file_path())?,
+            self.snapshot_id,
+            self.key_metadata.clone(),
+        );
+        writer.write(manifest).await
+    }
+
+    fn summary(&self) -> Summary {
+        Summary {
+            operation: crate::spec::Operation::Append,
+            other: HashMap::new(),
+        }
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(mut self) -> Result<Transaction<'a>> {
+        let summary = self.summary();
+        let manifest = self.manifest_for_data_file().await?;
+        let existing_manifest_files = self.manifest_from_parent_snapshot().await?;
+
+        let snapshot_produce_action = SnapshotProduceAction::new(
+            self.tx,
+            self.snapshot_id,
+            self.parent_snapshot_id,
+            self.schema_id,
+            self.format_version,
+            self.commit_uuid,
+        )?;
+
+        snapshot_produce_action
+            .apply(
+                vec![manifest]
+                    .into_iter()
+                    .chain(existing_manifest_files.into_iter()),
+                summary,
+            )
+            .await
+    }
+}
+
+struct SnapshotProduceAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema_id: i32,
+    format_version: FormatVersion,
+
+    commit_uuid: String,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        parent_snapshot_id: Option<i64>,
+        schema_id: i32,
+        format_version: FormatVersion,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema_id,
+            format_version,
+            commit_uuid,
+        })
+    }
+
+    fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String {
+        format!(
+            "{}/{}/snap-{}-{}-{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            self.snapshot_id,
+            next_seq_num,
+            self.commit_uuid,
+            DataFileFormat::Avro
+        )
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(
+        mut self,
+        manifest_files: impl IntoIterator<Item = ManifestFile>,
+        summary: Summary,
+    ) -> Result<Transaction<'a>> {
+        let next_seq_num = if self.format_version as u8 > 1u8 {
+            self.tx.table.metadata().last_sequence_number() + 1
+        } else {
+            INITIAL_SEQUENCE_NUMBER
+        };
+        let commit_ts = chrono::Utc::now().timestamp_millis();
+        let manifest_list_path = self.generate_manifest_list_file_path(next_seq_num);
+
+        let mut manifest_list_writer = ManifestListWriter::v2(
+            self.tx
+                .table
+                .file_io()
+                .new_output(manifest_list_path.clone())?,
+            self.snapshot_id,
+            // # TODO
+            // Should we use `0` here for default parent snapshot id?
+            self.parent_snapshot_id.unwrap_or_default(),

Review Comment:
   I have fixed it in the `fix None case for parant_snapshot_id` commit. When the parent_snapshot_id is none, we skip to add the snapshot field in manifest list metadata. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579375646


##########
crates/e2e_test/testdata/docker-compose.yaml:
##########
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+    image: tabulario/iceberg-rest:0.10.0
+    environment:
+      - AWS_ACCESS_KEY_ID=admin
+      - AWS_SECRET_ACCESS_KEY=password
+      - AWS_REGION=us-east-1
+      - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
+      - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+      - CATALOG_WAREHOUSE=s3://icebergdata/demo
+      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+      - CATALOG_S3_ENDPOINT=http://minio:9000
+    depends_on:
+      - minio
+    links:
+      - minio:icebergdata.minio
+    expose:
+      - 8181

Review Comment:
   I would love to run this from my IDE as well:
   ```suggestion
       ports:
         - 8181:8181
       expose:
         - 8181
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579466191


##########
crates/iceberg/src/transaction.rs:
##########
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the table.
+pub struct FastAppendAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema: Schema,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: PartitionSpec,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: String,
+    manifest_id: i64,
+
+    appended_data_files: Vec<DataFile>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: PartitionSpec,
+        key_metadata: Vec<u8>,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema,
+            schema_id,
+            format_version,
+            partition_spec,
+            key_metadata,
+            commit_uuid,
+            manifest_id: 0,
+            appended_data_files: vec![],
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_file: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.appended_data_files.extend(data_file);
+        Ok(self)
+    }
+
+    fn generate_manifest_file_path(&mut self) -> String {
+        let manifest_id = self.manifest_id;
+        self.manifest_id += 1;
+        format!(
+            "{}/{}/{}-m{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            &self.commit_uuid,
+            manifest_id,
+            DataFileFormat::Avro
+        )
+    }
+
+    async fn manifest_from_parent_snapshot(&self) -> Result<Vec<ManifestFile>> {
+        if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+            let manifest_list = snapshot
+                .load_manifest_list(self.tx.table.file_io(), &self.tx.table.metadata_ref())
+                .await?;
+            let mut manifest_files = Vec::with_capacity(manifest_list.entries().len());
+            for entry in manifest_list.entries() {
+                // From: https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+                // Why we need this?
+                if entry.added_snapshot_id == self.snapshot_id {
+                    continue;
+                }
+                let manifest = entry.load_manifest(self.tx.table.file_io()).await?;
+                // Skip manifest with all delete entries.
+                if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+                    continue;
+                }
+                manifest_files.push(entry.clone());
+            }
+            Ok(manifest_files)
+        } else {
+            Ok(vec![])
+        }
+    }
+
+    // Write manifest file for added data files and return the ManifestFile for ManifestList.
+    async fn manifest_for_data_file(&mut self) -> Result<ManifestFile> {
+        let appended_data_files = std::mem::take(&mut self.appended_data_files);
+        let manifest_entries = appended_data_files
+            .into_iter()
+            .map(|data_file| {
+                ManifestEntry::builder()
+                    .status(crate::spec::ManifestStatus::Added)
+                    .snapshot_id(self.snapshot_id)
+                    .data_file(data_file)
+                    .build()
+            })
+            .collect();
+        let manifest_meta = ManifestMetadata::builder()
+            .schema(self.schema.clone())
+            .schema_id(self.schema_id)
+            .format_version(self.format_version)
+            .partition_spec(self.partition_spec.clone())

Review Comment:
   How do we know if the written data adheres to the partition spec?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579372414


##########
crates/e2e_test/testdata/docker-compose.yaml:
##########
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+    image: tabulario/iceberg-rest:0.10.0
+    environment:
+      - AWS_ACCESS_KEY_ID=admin
+      - AWS_SECRET_ACCESS_KEY=password
+      - AWS_REGION=us-east-1
+      - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
+      - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+      - CATALOG_WAREHOUSE=s3://icebergdata/demo
+      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+      - CATALOG_S3_ENDPOINT=http://minio:9000
+    depends_on:
+      - minio
+    links:
+      - minio:icebergdata.minio
+    expose:
+      - 8181
+
+  minio:
+    image: minio/minio:RELEASE.2024-03-07T00-43-48Z
+    environment:
+      - MINIO_ROOT_USER=admin
+      - MINIO_ROOT_PASSWORD=password
+      - MINIO_DOMAIN=minio
+    expose:
+      - 9001
+      - 9000

Review Comment:
   How about also exposing the management console:
   ```suggestion
       ports:
         - 9001:9001
       expose:
         - 9001
         - 9000
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "ZENOTME (via GitHub)" <gi...@apache.org>.
ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1578812779


##########
crates/iceberg/src/transaction.rs:
##########
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the table.
+pub struct FastAppendAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema: Schema,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: PartitionSpec,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: String,
+    manifest_id: i64,
+
+    appended_data_files: Vec<DataFile>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: PartitionSpec,
+        key_metadata: Vec<u8>,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema,
+            schema_id,
+            format_version,
+            partition_spec,
+            key_metadata,
+            commit_uuid,
+            manifest_id: 0,
+            appended_data_files: vec![],
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_file: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.appended_data_files.extend(data_file);
+        Ok(self)
+    }
+
+    fn generate_manifest_file_path(&mut self) -> String {
+        let manifest_id = self.manifest_id;
+        self.manifest_id += 1;
+        format!(
+            "{}/{}/{}-m{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            &self.commit_uuid,
+            manifest_id,
+            DataFileFormat::Avro
+        )
+    }
+
+    async fn manifest_from_parent_snapshot(&self) -> Result<Vec<ManifestFile>> {
+        if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+            let manifest_list = snapshot
+                .load_manifest_list(self.tx.table.file_io(), &self.tx.table.metadata_ref())
+                .await?;
+            let mut manifest_files = Vec::with_capacity(manifest_list.entries().len());
+            for entry in manifest_list.entries() {
+                // From: https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+                // Why we need this?
+                if entry.added_snapshot_id == self.snapshot_id {
+                    continue;
+                }
+                let manifest = entry.load_manifest(self.tx.table.file_io()).await?;
+                // Skip manifest with all delete entries.
+                if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+                    continue;
+                }
+                manifest_files.push(entry.clone());
+            }
+            Ok(manifest_files)
+        } else {
+            Ok(vec![])
+        }
+    }
+
+    // Write manifest file for added data files and return the ManifestFile for ManifestList.
+    async fn manifest_for_data_file(&mut self) -> Result<ManifestFile> {
+        let appended_data_files = std::mem::take(&mut self.appended_data_files);
+        let manifest_entries = appended_data_files
+            .into_iter()
+            .map(|data_file| {
+                ManifestEntry::builder()
+                    .status(crate::spec::ManifestStatus::Added)
+                    .snapshot_id(self.snapshot_id)
+                    .data_file(data_file)
+                    .build()
+            })
+            .collect();
+        let manifest_meta = ManifestMetadata::builder()
+            .schema(self.schema.clone())
+            .schema_id(self.schema_id)
+            .format_version(self.format_version)
+            .partition_spec(self.partition_spec.clone())
+            .content(crate::spec::ManifestContentType::Data)
+            .build();
+        let manifest = Manifest::new(manifest_meta, manifest_entries);
+        let writer = ManifestWriter::new(
+            self.tx
+                .table
+                .file_io()
+                .new_output(self.generate_manifest_file_path())?,
+            self.snapshot_id,
+            self.key_metadata.clone(),
+        );
+        writer.write(manifest).await
+    }
+
+    fn summary(&self) -> Summary {
+        Summary {
+            operation: crate::spec::Operation::Append,
+            other: HashMap::new(),
+        }
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(mut self) -> Result<Transaction<'a>> {
+        let summary = self.summary();
+        let manifest = self.manifest_for_data_file().await?;
+        let existing_manifest_files = self.manifest_from_parent_snapshot().await?;
+
+        let snapshot_produce_action = SnapshotProduceAction::new(
+            self.tx,
+            self.snapshot_id,
+            self.parent_snapshot_id,
+            self.schema_id,
+            self.format_version,
+            self.commit_uuid,
+        )?;
+
+        snapshot_produce_action
+            .apply(
+                vec![manifest]
+                    .into_iter()
+                    .chain(existing_manifest_files.into_iter()),
+                summary,
+            )
+            .await
+    }
+}
+
+struct SnapshotProduceAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema_id: i32,
+    format_version: FormatVersion,
+
+    commit_uuid: String,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        parent_snapshot_id: Option<i64>,
+        schema_id: i32,
+        format_version: FormatVersion,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema_id,
+            format_version,
+            commit_uuid,
+        })
+    }
+
+    fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String {
+        format!(
+            "{}/{}/snap-{}-{}-{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            self.snapshot_id,
+            next_seq_num,
+            self.commit_uuid,
+            DataFileFormat::Avro
+        )
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(
+        mut self,
+        manifest_files: impl IntoIterator<Item = ManifestFile>,
+        summary: Summary,
+    ) -> Result<Transaction<'a>> {
+        let next_seq_num = if self.format_version as u8 > 1u8 {
+            self.tx.table.metadata().last_sequence_number() + 1
+        } else {
+            INITIAL_SEQUENCE_NUMBER
+        };
+        let commit_ts = chrono::Utc::now().timestamp_millis();
+        let manifest_list_path = self.generate_manifest_list_file_path(next_seq_num);
+
+        let mut manifest_list_writer = ManifestListWriter::v2(
+            self.tx
+                .table
+                .file_io()
+                .new_output(manifest_list_path.clone())?,
+            self.snapshot_id,
+            // # TODO
+            // Should we use `0` here for default parent snapshot id?
+            self.parent_snapshot_id.unwrap_or_default(),

Review Comment:
   Can we treat the parent_snapshot_id as 0 if there is no parent snapshot? This parent_snapshot_id will be written in metadata of manifest list in https://github.com/apache/iceberg-rust/blob/aba620900e99423bbd3fed969618e67e58a03a7b/crates/iceberg/src/spec/manifest_list.rs#L110
   
   cc @Fokko 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "ZENOTME (via GitHub)" <gi...@apache.org>.
ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1578811481


##########
crates/iceberg/src/io.rs:
##########
@@ -368,6 +368,9 @@ impl Storage {
         new_props.insert("root".to_string(), DEFAULT_ROOT_PATH.to_string());
 
         match scheme {
+            Scheme::Memory => Ok(Self::LocalFs {

Review Comment:
   To enable writing for unit tests, I add a schema for memory but consider it as LocalFs Storage, do we need to add a storage type for storage? cc @liurenjie1024 @Xuanwo 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579412576


##########
crates/iceberg/src/transaction.rs:
##########
@@ -95,6 +104,42 @@ impl<'a> Transaction<'a> {
         Ok(self)
     }
 
+    /// Creates a fast append action.
+    pub fn fast_append(
+        self,
+        commit_uuid: Option<String>,
+        key_metadata: Vec<u8>,
+    ) -> Result<FastAppendAction<'a>> {
+        let parent_snapshot_id = self
+            .table
+            .metadata()
+            .current_snapshot()
+            .map(|s| s.snapshot_id());
+        let snapshot_id = parent_snapshot_id.map(|id| id + 1).unwrap_or(0);

Review Comment:
   The snapshot ID is a random int, and it should be checked if it hasn't been used before:
   
   https://github.com/apache/iceberg-python/blob/f72e363b18baa181c998bbdef657982159a22d48/pyiceberg/table/metadata.py#L315-L326



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1578992843


##########
crates/iceberg/src/transaction.rs:
##########
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the table.
+pub struct FastAppendAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema: Schema,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: PartitionSpec,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: String,
+    manifest_id: i64,
+
+    appended_data_files: Vec<DataFile>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: PartitionSpec,
+        key_metadata: Vec<u8>,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema,
+            schema_id,
+            format_version,
+            partition_spec,
+            key_metadata,
+            commit_uuid,
+            manifest_id: 0,
+            appended_data_files: vec![],
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_file: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.appended_data_files.extend(data_file);
+        Ok(self)
+    }
+
+    fn generate_manifest_file_path(&mut self) -> String {
+        let manifest_id = self.manifest_id;
+        self.manifest_id += 1;
+        format!(
+            "{}/{}/{}-m{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            &self.commit_uuid,
+            manifest_id,
+            DataFileFormat::Avro
+        )
+    }
+
+    async fn manifest_from_parent_snapshot(&self) -> Result<Vec<ManifestFile>> {
+        if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+            let manifest_list = snapshot
+                .load_manifest_list(self.tx.table.file_io(), &self.tx.table.metadata_ref())
+                .await?;
+            let mut manifest_files = Vec::with_capacity(manifest_list.entries().len());
+            for entry in manifest_list.entries() {
+                // From: https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+                // Why we need this?
+                if entry.added_snapshot_id == self.snapshot_id {
+                    continue;
+                }
+                let manifest = entry.load_manifest(self.tx.table.file_io()).await?;
+                // Skip manifest with all delete entries.
+                if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+                    continue;
+                }
+                manifest_files.push(entry.clone());
+            }
+            Ok(manifest_files)
+        } else {
+            Ok(vec![])
+        }
+    }
+
+    // Write manifest file for added data files and return the ManifestFile for ManifestList.
+    async fn manifest_for_data_file(&mut self) -> Result<ManifestFile> {
+        let appended_data_files = std::mem::take(&mut self.appended_data_files);
+        let manifest_entries = appended_data_files
+            .into_iter()
+            .map(|data_file| {
+                ManifestEntry::builder()
+                    .status(crate::spec::ManifestStatus::Added)
+                    .snapshot_id(self.snapshot_id)
+                    .data_file(data_file)
+                    .build()
+            })
+            .collect();
+        let manifest_meta = ManifestMetadata::builder()
+            .schema(self.schema.clone())
+            .schema_id(self.schema_id)
+            .format_version(self.format_version)
+            .partition_spec(self.partition_spec.clone())
+            .content(crate::spec::ManifestContentType::Data)
+            .build();
+        let manifest = Manifest::new(manifest_meta, manifest_entries);
+        let writer = ManifestWriter::new(
+            self.tx
+                .table
+                .file_io()
+                .new_output(self.generate_manifest_file_path())?,
+            self.snapshot_id,
+            self.key_metadata.clone(),
+        );
+        writer.write(manifest).await
+    }
+
+    fn summary(&self) -> Summary {
+        Summary {
+            operation: crate::spec::Operation::Append,
+            other: HashMap::new(),
+        }
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(mut self) -> Result<Transaction<'a>> {
+        let summary = self.summary();
+        let manifest = self.manifest_for_data_file().await?;
+        let existing_manifest_files = self.manifest_from_parent_snapshot().await?;
+
+        let snapshot_produce_action = SnapshotProduceAction::new(
+            self.tx,
+            self.snapshot_id,
+            self.parent_snapshot_id,
+            self.schema_id,
+            self.format_version,
+            self.commit_uuid,
+        )?;
+
+        snapshot_produce_action
+            .apply(
+                vec![manifest]
+                    .into_iter()
+                    .chain(existing_manifest_files.into_iter()),
+                summary,
+            )
+            .await
+    }
+}
+
+struct SnapshotProduceAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema_id: i32,
+    format_version: FormatVersion,
+
+    commit_uuid: String,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        parent_snapshot_id: Option<i64>,
+        schema_id: i32,
+        format_version: FormatVersion,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema_id,
+            format_version,
+            commit_uuid,
+        })
+    }
+
+    fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String {
+        format!(
+            "{}/{}/snap-{}-{}-{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            self.snapshot_id,
+            next_seq_num,
+            self.commit_uuid,
+            DataFileFormat::Avro
+        )
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(
+        mut self,
+        manifest_files: impl IntoIterator<Item = ManifestFile>,
+        summary: Summary,
+    ) -> Result<Transaction<'a>> {
+        let next_seq_num = if self.format_version as u8 > 1u8 {
+            self.tx.table.metadata().last_sequence_number() + 1
+        } else {
+            INITIAL_SEQUENCE_NUMBER
+        };
+        let commit_ts = chrono::Utc::now().timestamp_millis();
+        let manifest_list_path = self.generate_manifest_list_file_path(next_seq_num);
+
+        let mut manifest_list_writer = ManifestListWriter::v2(
+            self.tx
+                .table
+                .file_io()
+                .new_output(manifest_list_path.clone())?,
+            self.snapshot_id,
+            // # TODO
+            // Should we use `0` here for default parent snapshot id?
+            self.parent_snapshot_id.unwrap_or_default(),

Review Comment:
   The snapshot-id is optional:
   
   ![image](https://github.com/apache/iceberg-rust/assets/1134248/2eebae01-c60b-43c3-ab51-ce46f7236479)
   
   Older versions of Java would write `-1` as the default snapshot ID if there was no snapshot, but this is actually wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579426812


##########
crates/e2e_test/testdata/docker-compose.yaml:
##########
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+    image: tabulario/iceberg-rest:0.10.0
+    environment:
+      - AWS_ACCESS_KEY_ID=admin
+      - AWS_SECRET_ACCESS_KEY=password
+      - AWS_REGION=us-east-1
+      - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
+      - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+      - CATALOG_WAREHOUSE=s3://icebergdata/demo
+      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+      - CATALOG_S3_ENDPOINT=http://minio:9000
+    depends_on:
+      - minio
+    links:
+      - minio:icebergdata.minio
+    expose:
+      - 8181
+
+  minio:

Review Comment:
   I'm not sure if the error is relevant. I'm able to run it locally now.
   
   > how about change `minio/minio:RELEASE.2024-03-07T00-43-48Z`, `minio/mc:RELEASE.2024-03-07T00-31-49Z` to `minio/minio:latest`, `minio/mc:latest`.
   
   +1 on that. I haven't encountered any issues with minio updates



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579004695


##########
crates/iceberg/src/io.rs:
##########
@@ -368,6 +368,9 @@ impl Storage {
         new_props.insert("root".to_string(), DEFAULT_ROOT_PATH.to_string());
 
         match scheme {
+            Scheme::Memory => Ok(Self::LocalFs {

Review Comment:
   I try to avoid adding code just for the sake of testing, inherently you're testing a different path than it would normally would.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "ZENOTME (via GitHub)" <gi...@apache.org>.
ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1598713751


##########
crates/iceberg/src/transaction.rs:
##########
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the table.
+pub struct FastAppendAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema: Schema,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: PartitionSpec,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: String,
+    manifest_id: i64,
+
+    appended_data_files: Vec<DataFile>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: PartitionSpec,
+        key_metadata: Vec<u8>,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema,
+            schema_id,
+            format_version,
+            partition_spec,
+            key_metadata,
+            commit_uuid,
+            manifest_id: 0,
+            appended_data_files: vec![],
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_file: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.appended_data_files.extend(data_file);
+        Ok(self)
+    }
+
+    fn generate_manifest_file_path(&mut self) -> String {
+        let manifest_id = self.manifest_id;
+        self.manifest_id += 1;
+        format!(
+            "{}/{}/{}-m{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            &self.commit_uuid,
+            manifest_id,
+            DataFileFormat::Avro
+        )
+    }
+
+    async fn manifest_from_parent_snapshot(&self) -> Result<Vec<ManifestFile>> {
+        if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+            let manifest_list = snapshot
+                .load_manifest_list(self.tx.table.file_io(), &self.tx.table.metadata_ref())
+                .await?;
+            let mut manifest_files = Vec::with_capacity(manifest_list.entries().len());
+            for entry in manifest_list.entries() {
+                // From: https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+                // Why we need this?
+                if entry.added_snapshot_id == self.snapshot_id {
+                    continue;
+                }
+                let manifest = entry.load_manifest(self.tx.table.file_io()).await?;
+                // Skip manifest with all delete entries.
+                if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+                    continue;
+                }
+                manifest_files.push(entry.clone());
+            }
+            Ok(manifest_files)
+        } else {
+            Ok(vec![])
+        }
+    }
+
+    // Write manifest file for added data files and return the ManifestFile for ManifestList.
+    async fn manifest_for_data_file(&mut self) -> Result<ManifestFile> {
+        let appended_data_files = std::mem::take(&mut self.appended_data_files);
+        let manifest_entries = appended_data_files
+            .into_iter()
+            .map(|data_file| {
+                ManifestEntry::builder()
+                    .status(crate::spec::ManifestStatus::Added)
+                    .snapshot_id(self.snapshot_id)
+                    .data_file(data_file)
+                    .build()
+            })
+            .collect();
+        let manifest_meta = ManifestMetadata::builder()
+            .schema(self.schema.clone())
+            .schema_id(self.schema_id)
+            .format_version(self.format_version)
+            .partition_spec(self.partition_spec.clone())

Review Comment:
   I add the https://github.com/apache/iceberg-rust/blob/950c40edd751b307ee98b7a76f4242bfaef87d86/crates/iceberg/src/transaction.rs#L216 for this 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "ZENOTME (via GitHub)" <gi...@apache.org>.
ZENOTME commented on PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#issuecomment-2076295905

   cc @liurenjie1024 @Fokko @Xuanwo 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579402269


##########
crates/e2e_test/tests/append_data_file_test.rs:
##########
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+    DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+    _docker_compose: DockerCompose,
+    rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+    set_up();
+    let docker_compose = DockerCompose::new(
+        normalize_test_name(format!("{}_{func}", module_path!())),
+        format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+    );
+
+    // Start docker compose
+    docker_compose.run();
+
+    let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+    let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+    loop {
+        if !scan_port_addr(&read_port) {
+            log::info!("Waiting for 1s rest catalog to ready...");
+            sleep(std::time::Duration::from_millis(1000)).await;
+        } else {
+            break;
+        }
+    }
+
+    let container_ip = docker_compose.get_container_ip("minio");
+    let read_port = format!("{}:{}", container_ip, 9000);
+
+    let config = RestCatalogConfig::builder()
+        .uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT))
+        .props(HashMap::from([
+            (S3_ENDPOINT.to_string(), format!("http://{}", read_port)),
+            (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+            (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+            (S3_REGION.to_string(), "us-east-1".to_string()),
+        ]))
+        .build();
+    let rest_catalog = RestCatalog::new(config).await.unwrap();
+
+    TestFixture {
+        _docker_compose: docker_compose,
+        rest_catalog,
+    }
+}
+
+#[tokio::test]
+async fn test_append_data_file() {
+    let fixture = set_test_fixture("test_create_table").await;
+
+    let ns = Namespace::with_properties(
+        NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+        HashMap::from([
+            ("owner".to_string(), "ray".to_string()),
+            ("community".to_string(), "apache".to_string()),
+        ]),
+    );
+
+    fixture
+        .rest_catalog
+        .create_namespace(ns.name(), ns.properties().clone())
+        .await
+        .unwrap();
+
+    let schema = Schema::builder()
+        .with_schema_id(1)
+        .with_identifier_field_ids(vec![2])
+        .with_fields(vec![
+            NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
+            NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
+            NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
+        ])
+        .build()
+        .unwrap();
+
+    let table_creation = TableCreation::builder()
+        .name("t1".to_string())
+        .schema(schema.clone())
+        .build();
+
+    let table = fixture
+        .rest_catalog
+        .create_table(ns.name(), table_creation)
+        .await
+        .unwrap();
+
+    // Create the writer and write the data
+    let schema: Arc<arrow_schema::Schema> = Arc::new(
+        table
+            .metadata()
+            .current_schema()
+            .as_ref()
+            .try_into()
+            .unwrap(),
+    );
+    let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
+    let file_name_generator = DefaultFileNameGenerator::new(
+        "test".to_string(),
+        None,
+        iceberg::spec::DataFileFormat::Parquet,
+    );
+    let parquet_writer_builder = ParquetWriterBuilder::new(

Review Comment:
   Double checked if the schema produces the field-IDs:
   
   ```
   parq test-00000.parquet --schema
   
   # Schema 
   <pyarrow._parquet.ParquetSchema object at 0x12209ef00>
   required group field_id=-1 arrow_schema {
     optional binary field_id=1 foo (String);
     required int32 field_id=2 bar;
     optional boolean field_id=3 baz;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "ZENOTME (via GitHub)" <gi...@apache.org>.
ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579406252


##########
crates/e2e_test/testdata/docker-compose.yaml:
##########
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+    image: tabulario/iceberg-rest:0.10.0
+    environment:
+      - AWS_ACCESS_KEY_ID=admin
+      - AWS_SECRET_ACCESS_KEY=password
+      - AWS_REGION=us-east-1
+      - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
+      - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+      - CATALOG_WAREHOUSE=s3://icebergdata/demo
+      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+      - CATALOG_S3_ENDPOINT=http://minio:9000
+    depends_on:
+      - minio
+    links:
+      - minio:icebergdata.minio
+    expose:
+      - 8181
+
+  minio:
+    image: minio/minio:RELEASE.2024-03-07T00-43-48Z
+    environment:
+      - MINIO_ROOT_USER=admin
+      - MINIO_ROOT_PASSWORD=password
+      - MINIO_DOMAIN=minio
+    expose:
+      - 9001
+      - 9000

Review Comment:
   LGTM. I also love to expose to make it easier to debug.🤣



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579520268


##########
crates/e2e_test/tests/append_data_file_test.rs:
##########
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+    DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+    _docker_compose: DockerCompose,
+    rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+    set_up();
+    let docker_compose = DockerCompose::new(
+        normalize_test_name(format!("{}_{func}", module_path!())),
+        format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+    );
+
+    // Start docker compose
+    docker_compose.run();
+
+    let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+    let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+    loop {
+        if !scan_port_addr(&read_port) {
+            log::info!("Waiting for 1s rest catalog to ready...");
+            sleep(std::time::Duration::from_millis(1000)).await;
+        } else {
+            break;
+        }
+    }
+
+    let container_ip = docker_compose.get_container_ip("minio");
+    let read_port = format!("{}:{}", container_ip, 9000);
+
+    let config = RestCatalogConfig::builder()
+        .uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT))
+        .props(HashMap::from([
+            (S3_ENDPOINT.to_string(), format!("http://{}", read_port)),
+            (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+            (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+            (S3_REGION.to_string(), "us-east-1".to_string()),
+        ]))
+        .build();
+    let rest_catalog = RestCatalog::new(config).await.unwrap();
+
+    TestFixture {
+        _docker_compose: docker_compose,
+        rest_catalog,
+    }
+}
+
+#[tokio::test]
+async fn test_append_data_file() {
+    let fixture = set_test_fixture("test_create_table").await;
+
+    let ns = Namespace::with_properties(
+        NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+        HashMap::from([
+            ("owner".to_string(), "ray".to_string()),
+            ("community".to_string(), "apache".to_string()),
+        ]),
+    );
+
+    fixture
+        .rest_catalog
+        .create_namespace(ns.name(), ns.properties().clone())
+        .await
+        .unwrap();
+
+    let schema = Schema::builder()
+        .with_schema_id(1)
+        .with_identifier_field_ids(vec![2])
+        .with_fields(vec![
+            NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
+            NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
+            NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
+        ])
+        .build()
+        .unwrap();
+
+    let table_creation = TableCreation::builder()
+        .name("t1".to_string())
+        .schema(schema.clone())
+        .build();
+
+    let table = fixture
+        .rest_catalog
+        .create_table(ns.name(), table_creation)
+        .await
+        .unwrap();
+
+    // Create the writer and write the data
+    let schema: Arc<arrow_schema::Schema> = Arc::new(
+        table
+            .metadata()
+            .current_schema()
+            .as_ref()
+            .try_into()
+            .unwrap(),
+    );
+    let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
+    let file_name_generator = DefaultFileNameGenerator::new(
+        "test".to_string(),
+        None,
+        iceberg::spec::DataFileFormat::Parquet,
+    );
+    let parquet_writer_builder = ParquetWriterBuilder::new(
+        WriterProperties::default(),
+        schema.clone(),
+        table.file_io().clone(),
+        location_generator.clone(),
+        file_name_generator.clone(),
+    );
+    let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder);
+    let mut data_file_writer = data_file_writer_builder
+        .build(DataFileWriterConfig::new(None))
+        .await
+        .unwrap();
+    let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
+    let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
+    let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
+    let batch = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(col1) as ArrayRef,
+            Arc::new(col2) as ArrayRef,
+            Arc::new(col3) as ArrayRef,
+        ],
+    )
+    .unwrap();
+    data_file_writer.write(batch.clone()).await.unwrap();

Review Comment:
   I noticed that we don't compute the upper- and lower bounds:
   
   ```json
           "lower_bounds": {
               "array": []
           },
           "upper_bounds": {
               "array": []
           },
   ```
   
   This diminishes the value of Iceberg quite a bit since we cannot do any file skipping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "ZENOTME (via GitHub)" <gi...@apache.org>.
ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1580571634


##########
crates/iceberg/src/io.rs:
##########
@@ -368,6 +368,9 @@ impl Storage {
         new_props.insert("root".to_string(), DEFAULT_ROOT_PATH.to_string());
 
         match scheme {
+            Scheme::Memory => Ok(Self::LocalFs {

Review Comment:
   >  inherently you're testing a different path than it would normally would.
   
   Yes, but we only limit it to the storage layer. It can provide a writable mock layer for unit tests which can make us test the process logic and internal state in units easier. Otherwise, we only can test this in e2e test. And the problem with e2e test is that we can't check the internal state. 🤔



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "ZENOTME (via GitHub)" <gi...@apache.org>.
ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1580446821


##########
crates/e2e_test/tests/append_data_file_test.rs:
##########
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+    DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+    _docker_compose: DockerCompose,
+    rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+    set_up();
+    let docker_compose = DockerCompose::new(
+        normalize_test_name(format!("{}_{func}", module_path!())),
+        format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+    );
+
+    // Start docker compose
+    docker_compose.run();
+
+    let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+    let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+    loop {
+        if !scan_port_addr(&read_port) {
+            log::info!("Waiting for 1s rest catalog to ready...");
+            sleep(std::time::Duration::from_millis(1000)).await;
+        } else {
+            break;
+        }
+    }
+
+    let container_ip = docker_compose.get_container_ip("minio");
+    let read_port = format!("{}:{}", container_ip, 9000);
+
+    let config = RestCatalogConfig::builder()
+        .uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT))
+        .props(HashMap::from([
+            (S3_ENDPOINT.to_string(), format!("http://{}", read_port)),
+            (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
+            (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
+            (S3_REGION.to_string(), "us-east-1".to_string()),
+        ]))
+        .build();
+    let rest_catalog = RestCatalog::new(config).await.unwrap();
+
+    TestFixture {
+        _docker_compose: docker_compose,
+        rest_catalog,
+    }
+}
+
+#[tokio::test]
+async fn test_append_data_file() {
+    let fixture = set_test_fixture("test_create_table").await;
+
+    let ns = Namespace::with_properties(
+        NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+        HashMap::from([
+            ("owner".to_string(), "ray".to_string()),
+            ("community".to_string(), "apache".to_string()),
+        ]),
+    );
+
+    fixture
+        .rest_catalog
+        .create_namespace(ns.name(), ns.properties().clone())
+        .await
+        .unwrap();
+
+    let schema = Schema::builder()
+        .with_schema_id(1)
+        .with_identifier_field_ids(vec![2])
+        .with_fields(vec![
+            NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
+            NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
+            NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
+        ])
+        .build()
+        .unwrap();
+
+    let table_creation = TableCreation::builder()
+        .name("t1".to_string())
+        .schema(schema.clone())
+        .build();
+
+    let table = fixture
+        .rest_catalog
+        .create_table(ns.name(), table_creation)
+        .await
+        .unwrap();
+
+    // Create the writer and write the data
+    let schema: Arc<arrow_schema::Schema> = Arc::new(
+        table
+            .metadata()
+            .current_schema()
+            .as_ref()
+            .try_into()
+            .unwrap(),
+    );
+    let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
+    let file_name_generator = DefaultFileNameGenerator::new(
+        "test".to_string(),
+        None,
+        iceberg::spec::DataFileFormat::Parquet,
+    );
+    let parquet_writer_builder = ParquetWriterBuilder::new(
+        WriterProperties::default(),
+        schema.clone(),
+        table.file_io().clone(),
+        location_generator.clone(),
+        file_name_generator.clone(),
+    );
+    let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder);
+    let mut data_file_writer = data_file_writer_builder
+        .build(DataFileWriterConfig::new(None))
+        .await
+        .unwrap();
+    let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
+    let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
+    let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
+    let batch = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(col1) as ArrayRef,
+            Arc::new(col2) as ArrayRef,
+            Arc::new(col3) as ArrayRef,
+        ],
+    )
+    .unwrap();
+    data_file_writer.write(batch.clone()).await.unwrap();

Review Comment:
   Yes, I will complete them later in another PR. 
   https://github.com/apache/iceberg-rust/blob/aba620900e99423bbd3fed969618e67e58a03a7b/crates/iceberg/src/writer/file_writer/parquet_writer.rs#L188



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579420662


##########
crates/iceberg/src/transaction.rs:
##########
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the table.
+pub struct FastAppendAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema: Schema,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: PartitionSpec,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: String,
+    manifest_id: i64,
+
+    appended_data_files: Vec<DataFile>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: PartitionSpec,
+        key_metadata: Vec<u8>,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema,
+            schema_id,
+            format_version,
+            partition_spec,
+            key_metadata,
+            commit_uuid,
+            manifest_id: 0,
+            appended_data_files: vec![],
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_file: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.appended_data_files.extend(data_file);
+        Ok(self)
+    }
+
+    fn generate_manifest_file_path(&mut self) -> String {
+        let manifest_id = self.manifest_id;
+        self.manifest_id += 1;
+        format!(
+            "{}/{}/{}-m{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            &self.commit_uuid,
+            manifest_id,
+            DataFileFormat::Avro
+        )
+    }
+
+    async fn manifest_from_parent_snapshot(&self) -> Result<Vec<ManifestFile>> {
+        if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+            let manifest_list = snapshot
+                .load_manifest_list(self.tx.table.file_io(), &self.tx.table.metadata_ref())
+                .await?;
+            let mut manifest_files = Vec::with_capacity(manifest_list.entries().len());
+            for entry in manifest_list.entries() {
+                // From: https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+                // Why we need this?
+                if entry.added_snapshot_id == self.snapshot_id {
+                    continue;
+                }
+                let manifest = entry.load_manifest(self.tx.table.file_io()).await?;
+                // Skip manifest with all delete entries.
+                if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+                    continue;
+                }
+                manifest_files.push(entry.clone());
+            }
+            Ok(manifest_files)
+        } else {
+            Ok(vec![])
+        }
+    }
+
+    // Write manifest file for added data files and return the ManifestFile for ManifestList.
+    async fn manifest_for_data_file(&mut self) -> Result<ManifestFile> {
+        let appended_data_files = std::mem::take(&mut self.appended_data_files);
+        let manifest_entries = appended_data_files
+            .into_iter()
+            .map(|data_file| {
+                ManifestEntry::builder()
+                    .status(crate::spec::ManifestStatus::Added)
+                    .snapshot_id(self.snapshot_id)
+                    .data_file(data_file)
+                    .build()
+            })
+            .collect();
+        let manifest_meta = ManifestMetadata::builder()
+            .schema(self.schema.clone())
+            .schema_id(self.schema_id)
+            .format_version(self.format_version)
+            .partition_spec(self.partition_spec.clone())
+            .content(crate::spec::ManifestContentType::Data)
+            .build();
+        let manifest = Manifest::new(manifest_meta, manifest_entries);
+        let writer = ManifestWriter::new(
+            self.tx
+                .table
+                .file_io()
+                .new_output(self.generate_manifest_file_path())?,
+            self.snapshot_id,
+            self.key_metadata.clone(),
+        );
+        writer.write(manifest).await
+    }
+
+    fn summary(&self) -> Summary {
+        Summary {
+            operation: crate::spec::Operation::Append,
+            other: HashMap::new(),

Review Comment:
   The summary generation is missing here. I'm okay with doing that in a separate PR, but we have to make sure that we add it before the release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579472707


##########
crates/iceberg/src/transaction.rs:
##########
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the table.
+pub struct FastAppendAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema: Schema,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: PartitionSpec,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: String,
+    manifest_id: i64,
+
+    appended_data_files: Vec<DataFile>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: PartitionSpec,
+        key_metadata: Vec<u8>,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema,
+            schema_id,
+            format_version,
+            partition_spec,
+            key_metadata,
+            commit_uuid,
+            manifest_id: 0,
+            appended_data_files: vec![],
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_file: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.appended_data_files.extend(data_file);
+        Ok(self)
+    }
+
+    fn generate_manifest_file_path(&mut self) -> String {
+        let manifest_id = self.manifest_id;
+        self.manifest_id += 1;
+        format!(
+            "{}/{}/{}-m{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            &self.commit_uuid,
+            manifest_id,
+            DataFileFormat::Avro
+        )
+    }
+
+    async fn manifest_from_parent_snapshot(&self) -> Result<Vec<ManifestFile>> {
+        if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+            let manifest_list = snapshot
+                .load_manifest_list(self.tx.table.file_io(), &self.tx.table.metadata_ref())
+                .await?;
+            let mut manifest_files = Vec::with_capacity(manifest_list.entries().len());
+            for entry in manifest_list.entries() {
+                // From: https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+                // Why we need this?
+                if entry.added_snapshot_id == self.snapshot_id {
+                    continue;
+                }
+                let manifest = entry.load_manifest(self.tx.table.file_io()).await?;
+                // Skip manifest with all delete entries.
+                if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+                    continue;
+                }
+                manifest_files.push(entry.clone());
+            }
+            Ok(manifest_files)
+        } else {
+            Ok(vec![])
+        }
+    }
+
+    // Write manifest file for added data files and return the ManifestFile for ManifestList.
+    async fn manifest_for_data_file(&mut self) -> Result<ManifestFile> {
+        let appended_data_files = std::mem::take(&mut self.appended_data_files);
+        let manifest_entries = appended_data_files
+            .into_iter()
+            .map(|data_file| {
+                ManifestEntry::builder()
+                    .status(crate::spec::ManifestStatus::Added)
+                    .snapshot_id(self.snapshot_id)

Review Comment:
   This is different for V1 and V2. For V1 we do want to write this, for V2 typically not. This is because when the commit fails due to a conflict, we have to rewrite the manifest-list but can-reuse the already written manifests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579449633


##########
crates/iceberg/src/transaction.rs:
##########
@@ -121,6 +166,270 @@ impl<'a> Transaction<'a> {
     }
 }
 
+/// FastAppendAction is a transaction action for fast append data files to the table.
+pub struct FastAppendAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema: Schema,
+    schema_id: i32,
+    format_version: FormatVersion,
+    partition_spec: PartitionSpec,
+    key_metadata: Vec<u8>,
+
+    commit_uuid: String,
+    manifest_id: i64,
+
+    appended_data_files: Vec<DataFile>,
+}
+
+impl<'a> FastAppendAction<'a> {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        parent_snapshot_id: Option<i64>,
+        snapshot_id: i64,
+        schema: Schema,
+        schema_id: i32,
+        format_version: FormatVersion,
+        partition_spec: PartitionSpec,
+        key_metadata: Vec<u8>,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema,
+            schema_id,
+            format_version,
+            partition_spec,
+            key_metadata,
+            commit_uuid,
+            manifest_id: 0,
+            appended_data_files: vec![],
+        })
+    }
+
+    /// Add data files to the snapshot.
+    pub fn add_data_files(
+        &mut self,
+        data_file: impl IntoIterator<Item = DataFile>,
+    ) -> Result<&mut Self> {
+        self.appended_data_files.extend(data_file);
+        Ok(self)
+    }
+
+    fn generate_manifest_file_path(&mut self) -> String {
+        let manifest_id = self.manifest_id;
+        self.manifest_id += 1;
+        format!(
+            "{}/{}/{}-m{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            &self.commit_uuid,
+            manifest_id,
+            DataFileFormat::Avro
+        )
+    }
+
+    async fn manifest_from_parent_snapshot(&self) -> Result<Vec<ManifestFile>> {
+        if let Some(snapshot) = self.tx.table.metadata().current_snapshot() {
+            let manifest_list = snapshot
+                .load_manifest_list(self.tx.table.file_io(), &self.tx.table.metadata_ref())
+                .await?;
+            let mut manifest_files = Vec::with_capacity(manifest_list.entries().len());
+            for entry in manifest_list.entries() {
+                // From: https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921
+                // Why we need this?
+                if entry.added_snapshot_id == self.snapshot_id {
+                    continue;
+                }
+                let manifest = entry.load_manifest(self.tx.table.file_io()).await?;
+                // Skip manifest with all delete entries.
+                if manifest.entries().iter().all(|entry| !entry.is_alive()) {
+                    continue;
+                }
+                manifest_files.push(entry.clone());
+            }
+            Ok(manifest_files)
+        } else {
+            Ok(vec![])
+        }
+    }
+
+    // Write manifest file for added data files and return the ManifestFile for ManifestList.
+    async fn manifest_for_data_file(&mut self) -> Result<ManifestFile> {
+        let appended_data_files = std::mem::take(&mut self.appended_data_files);
+        let manifest_entries = appended_data_files
+            .into_iter()
+            .map(|data_file| {
+                ManifestEntry::builder()
+                    .status(crate::spec::ManifestStatus::Added)
+                    .snapshot_id(self.snapshot_id)
+                    .data_file(data_file)
+                    .build()
+            })
+            .collect();
+        let manifest_meta = ManifestMetadata::builder()
+            .schema(self.schema.clone())
+            .schema_id(self.schema_id)
+            .format_version(self.format_version)
+            .partition_spec(self.partition_spec.clone())
+            .content(crate::spec::ManifestContentType::Data)
+            .build();
+        let manifest = Manifest::new(manifest_meta, manifest_entries);
+        let writer = ManifestWriter::new(
+            self.tx
+                .table
+                .file_io()
+                .new_output(self.generate_manifest_file_path())?,
+            self.snapshot_id,
+            self.key_metadata.clone(),
+        );
+        writer.write(manifest).await
+    }
+
+    fn summary(&self) -> Summary {
+        Summary {
+            operation: crate::spec::Operation::Append,
+            other: HashMap::new(),
+        }
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(mut self) -> Result<Transaction<'a>> {
+        let summary = self.summary();
+        let manifest = self.manifest_for_data_file().await?;
+        let existing_manifest_files = self.manifest_from_parent_snapshot().await?;
+
+        let snapshot_produce_action = SnapshotProduceAction::new(
+            self.tx,
+            self.snapshot_id,
+            self.parent_snapshot_id,
+            self.schema_id,
+            self.format_version,
+            self.commit_uuid,
+        )?;
+
+        snapshot_produce_action
+            .apply(
+                vec![manifest]
+                    .into_iter()
+                    .chain(existing_manifest_files.into_iter()),
+                summary,
+            )
+            .await
+    }
+}
+
+struct SnapshotProduceAction<'a> {
+    tx: Transaction<'a>,
+
+    parent_snapshot_id: Option<i64>,
+    snapshot_id: i64,
+    schema_id: i32,
+    format_version: FormatVersion,
+
+    commit_uuid: String,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+    pub(crate) fn new(
+        tx: Transaction<'a>,
+        snapshot_id: i64,
+        parent_snapshot_id: Option<i64>,
+        schema_id: i32,
+        format_version: FormatVersion,
+        commit_uuid: String,
+    ) -> Result<Self> {
+        Ok(Self {
+            tx,
+            parent_snapshot_id,
+            snapshot_id,
+            schema_id,
+            format_version,
+            commit_uuid,
+        })
+    }
+
+    fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String {
+        format!(
+            "{}/{}/snap-{}-{}-{}.{}",
+            self.tx.table.metadata().location(),
+            META_ROOT_PATH,
+            self.snapshot_id,
+            next_seq_num,
+            self.commit_uuid,
+            DataFileFormat::Avro
+        )
+    }
+
+    /// Finished building the action and apply it to the transaction.
+    pub async fn apply(
+        mut self,
+        manifest_files: impl IntoIterator<Item = ManifestFile>,
+        summary: Summary,
+    ) -> Result<Transaction<'a>> {
+        let next_seq_num = if self.format_version as u8 > 1u8 {
+            self.tx.table.metadata().last_sequence_number() + 1
+        } else {
+            INITIAL_SEQUENCE_NUMBER
+        };
+        let commit_ts = chrono::Utc::now().timestamp_millis();
+        let manifest_list_path = self.generate_manifest_list_file_path(next_seq_num);
+
+        let mut manifest_list_writer = ManifestListWriter::v2(
+            self.tx
+                .table
+                .file_io()
+                .new_output(manifest_list_path.clone())?,
+            self.snapshot_id,
+            // # TODO
+            // Should we use `0` here for default parent snapshot id?
+            self.parent_snapshot_id.unwrap_or_default(),
+            next_seq_num,
+        );
+        manifest_list_writer.add_manifests(manifest_files.into_iter())?;
+        manifest_list_writer.close().await?;
+
+        let new_snapshot = Snapshot::builder()
+            .with_manifest_list(manifest_list_path)
+            .with_snapshot_id(self.snapshot_id)
+            .with_parent_snapshot_id(self.parent_snapshot_id)
+            .with_sequence_number(next_seq_num)
+            .with_summary(summary)
+            .with_schema_id(self.schema_id)
+            .with_timestamp_ms(commit_ts)
+            .build();
+
+        let new_snapshot_id = new_snapshot.snapshot_id();
+        self.tx.append_updates(vec![
+            TableUpdate::AddSnapshot {
+                snapshot: new_snapshot,
+            },
+            TableUpdate::SetSnapshotRef {
+                ref_name: MAIN_BRANCH.to_string(),
+                reference: SnapshotReference::new(
+                    new_snapshot_id,
+                    SnapshotRetention::branch(None, None, None),
+                ),
+            },
+        ])?;
+        self.tx.append_requirements(vec![
+            TableRequirement::UuidMatch {
+                uuid: self.tx.table.metadata().uuid(),
+            },
+            TableRequirement::RefSnapshotIdMatch {
+                r#ref: MAIN_BRANCH.to_string(),
+                snapshot_id: self.parent_snapshot_id,

Review Comment:
   This will work for now, but might get problematic later on. An important concept for Iceberg is to stack snapshots in a single commit. For example, now with append being added in this PR, we can easily add support for truncate. This would be a delete operation where all the data is being dropped, and then just an append.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579401183


##########
crates/e2e_test/tests/append_data_file_test.rs:
##########
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
+use futures::TryStreamExt;
+use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
+use iceberg::writer::file_writer::location_generator::{
+    DefaultFileNameGenerator, DefaultLocationGenerator,
+};
+use iceberg::writer::file_writer::ParquetWriterBuilder;
+use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use parquet::file::properties::WriterProperties;
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+    _docker_compose: DockerCompose,
+    rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+    set_up();
+    let docker_compose = DockerCompose::new(
+        normalize_test_name(format!("{}_{func}", module_path!())),
+        format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
+    );
+
+    // Start docker compose
+    docker_compose.run();
+
+    let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+    let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+    loop {
+        if !scan_port_addr(&read_port) {
+            log::info!("Waiting for 1s rest catalog to ready...");
+            sleep(std::time::Duration::from_millis(1000)).await;
+        } else {
+            break;
+        }
+    }
+
+    let container_ip = docker_compose.get_container_ip("minio");

Review Comment:
   I've exposed the ports, and this allows me to just point to 127.0.0.1 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] feat: support append data file and add e2e test [iceberg-rust]

Posted by "ZENOTME (via GitHub)" <gi...@apache.org>.
ZENOTME commented on code in PR #349:
URL: https://github.com/apache/iceberg-rust/pull/349#discussion_r1579401733


##########
crates/e2e_test/testdata/docker-compose.yaml:
##########
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+  rest:
+    image: tabulario/iceberg-rest:0.10.0
+    environment:
+      - AWS_ACCESS_KEY_ID=admin
+      - AWS_SECRET_ACCESS_KEY=password
+      - AWS_REGION=us-east-1
+      - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
+      - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+      - CATALOG_WAREHOUSE=s3://icebergdata/demo
+      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+      - CATALOG_S3_ENDPOINT=http://minio:9000
+    depends_on:
+      - minio
+    links:
+      - minio:icebergdata.minio
+    expose:
+      - 8181
+
+  minio:

Review Comment:
   how about change `minio/minio:RELEASE.2024-03-07T00-43-48Z`, `minio/mc:RELEASE.2024-03-07T00-31-49Z` to `minio/minio:latest`, `minio/mc:latest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org