You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "Polectron (via GitHub)" <gi...@apache.org> on 2023/03/21 20:22:31 UTC

[GitHub] [iceberg] Polectron opened a new pull request, #7163: Python: Add limit parameter to table scan

Polectron opened a new pull request, #7163:
URL: https://github.com/apache/iceberg/pull/7163

   Building top of #7033 uses `pyarrow.dataset.Scanner.head(num_rows)` and a `multiprocessing.Value` to limit the number rows retrieved on `pyiceberg.io.pyarrow.project_table()`, avoids reading more files if the desired quantity specified by `limit` has been reached
   
   Closes #7013


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Polectron commented on a diff in pull request #7163: Python: Add limit parameter to table scan

Posted by "Polectron (via GitHub)" <gi...@apache.org>.
Polectron commented on code in PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#discussion_r1144319147


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -517,15 +536,22 @@ def _file_to_table(
         if file_schema is None:
             raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}")
 
-        arrow_table = pq.read_table(
-            source=fout,
-            schema=parquet_schema,
-            pre_buffer=True,
-            buffer_size=8 * ONE_MEGABYTE,
-            filters=pyarrow_filter,
+        fragment_scanner = ds.Scanner.from_fragment(
+            fragment=fragment,
+            schema=physical_schema,
+            filter=pyarrow_filter,
             columns=[col.name for col in file_project_schema.columns],
         )
 
+        if limit:
+            arrow_table = fragment_scanner.head(limit)
+            with rows_counter.get_lock():

Review Comment:
   The [official python documentation for multiprocessing.Value](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Value) suggests that non atomic operations like `+=` should use a lock. Otherwise a race condition could happen where multiple reads end at the same time and overwrite the row counter, causing that we keep reading rows even if we already read enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Polectron commented on a diff in pull request #7163: Python: Add limit parameter to table scan

Posted by "Polectron (via GitHub)" <gi...@apache.org>.
Polectron commented on code in PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#discussion_r1145424387


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -517,15 +536,22 @@ def _file_to_table(
         if file_schema is None:
             raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}")
 
-        arrow_table = pq.read_table(
-            source=fout,
-            schema=parquet_schema,
-            pre_buffer=True,
-            buffer_size=8 * ONE_MEGABYTE,
-            filters=pyarrow_filter,
+        fragment_scanner = ds.Scanner.from_fragment(
+            fragment=fragment,
+            schema=physical_schema,
+            filter=pyarrow_filter,
             columns=[col.name for col in file_project_schema.columns],
         )
 
+        if limit:
+            arrow_table = fragment_scanner.head(limit)
+            with rows_counter.get_lock():

Review Comment:
   I think you made the right choice going for multi-threading because the task is [IO-bound and there are potentially thousands of tasks that need to be executed](https://superfastpython.com/threadpool-vs-pool-in-python/). Even though we use multi-threading which is bound to the GIL [some operations might still not be safe](https://superfastpython.com/threadpoolexecutor-thread-safe/) (like `+=` which performs a read and a write as two separate operations).
   Also, even though the tasks are run on threads in a `ThreadPool`, [`multiprocessing.Value`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Value) implements a [`multiprocessing.RLock`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.RLock) by default wich is compatible with both processes and threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Polectron commented on a diff in pull request #7163: Python: Add limit parameter to table scan

Posted by "Polectron (via GitHub)" <gi...@apache.org>.
Polectron commented on code in PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#discussion_r1145424387


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -517,15 +536,22 @@ def _file_to_table(
         if file_schema is None:
             raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}")
 
-        arrow_table = pq.read_table(
-            source=fout,
-            schema=parquet_schema,
-            pre_buffer=True,
-            buffer_size=8 * ONE_MEGABYTE,
-            filters=pyarrow_filter,
+        fragment_scanner = ds.Scanner.from_fragment(
+            fragment=fragment,
+            schema=physical_schema,
+            filter=pyarrow_filter,
             columns=[col.name for col in file_project_schema.columns],
         )
 
+        if limit:
+            arrow_table = fragment_scanner.head(limit)
+            with rows_counter.get_lock():

Review Comment:
   I think you actually made the right choice going for multi-threading because the task is [IO-bound and there are potentially thousands of tasks that need to be executed](https://superfastpython.com/threadpool-vs-pool-in-python/). Even though we use multi-threading which is bound to the GIL [some operations might still not be safe](https://superfastpython.com/threadpoolexecutor-thread-safe/) (like `+=` which performs a read and a write as two separate operations).
   Also, even though the tasks are run on threads in a `ThreadPool`, [`multiprocessing.Value`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Value) implements a [`multiprocessing.RLock`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.RLock) by default wich is compatible with both processes and threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko merged pull request #7163: Python: Add limit parameter to table scan

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko merged PR #7163:
URL: https://github.com/apache/iceberg/pull/7163


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on pull request #7163: Python: Add limit parameter to table scan

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#issuecomment-1482433252

   Thanks for working on this @Polectron, this is a great addition 👍🏻 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7163: Python: Add limit parameter to table scan

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#discussion_r1145289020


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -517,15 +536,22 @@ def _file_to_table(
         if file_schema is None:
             raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}")
 
-        arrow_table = pq.read_table(
-            source=fout,
-            schema=parquet_schema,
-            pre_buffer=True,
-            buffer_size=8 * ONE_MEGABYTE,
-            filters=pyarrow_filter,
+        fragment_scanner = ds.Scanner.from_fragment(
+            fragment=fragment,
+            schema=physical_schema,
+            filter=pyarrow_filter,
             columns=[col.name for col in file_project_schema.columns],
         )
 
+        if limit:
+            arrow_table = fragment_scanner.head(limit)
+            with rows_counter.get_lock():

Review Comment:
   Ah, thanks for the explanation. Currently, we don't do multi-processing, but multi-threading. I did some extensive testing and noticed that multi-processing wasn't substantially faster than multithreading. Probably because most time is spent in fetching the files, and reading the data, which all happens in Arrow, which bypasses the GIL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on pull request #7163: Python: Add limit parameter to table scan

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#issuecomment-1480111764

   @Polectron I noticed this as well. The current master is red because of failing Python integration tests. Maybe we can increase the timeout for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Polectron commented on a diff in pull request #7163: Python: Add limit parameter to table scan

Posted by "Polectron (via GitHub)" <gi...@apache.org>.
Polectron commented on code in PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#discussion_r1144320360


##########
python/pyiceberg/manifest.py:
##########
@@ -177,6 +177,12 @@ class DataFile(Record):
     sort_order_id: Optional[int]
     spec_id: Optional[int]
 
+    def __setattr__(self, name: str, value: Any) -> None:
+        # The file_format is written as a string, so we need to cast it to the Enum
+        if name == "file_format":
+            value = FileFormat[value]

Review Comment:
   This was introduced by merging #7033 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Polectron commented on pull request #7163: Python: Add limit parameter to table scan

Posted by "Polectron (via GitHub)" <gi...@apache.org>.
Polectron commented on PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#issuecomment-1480148922

   @Fokko 
   Done! 🎉 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on pull request #7163: Python: Add limit parameter to table scan

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#issuecomment-1481689772

   @Polectron looks like https://github.com/apache/iceberg/pull/7148 went in, can you rebase?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7163: Python: Add limit parameter to table scan

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#discussion_r1143985117


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -484,21 +488,36 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
     return boolean_expression_visit(expr, _ConvertToArrowExpression())
 
 
+@lru_cache
+def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.FileFormat:
+    if file_format == FileFormat.PARQUET.value:
+        return ds.ParquetFileFormat(**kwargs)
+    elif file_format == FileFormat.ORC.value:

Review Comment:
   We want to remove this, and we can implement ORC in https://github.com/apache/iceberg/pull/7033 because it needs more work.



##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -517,15 +536,22 @@ def _file_to_table(
         if file_schema is None:
             raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}")
 
-        arrow_table = pq.read_table(
-            source=fout,
-            schema=parquet_schema,
-            pre_buffer=True,
-            buffer_size=8 * ONE_MEGABYTE,
-            filters=pyarrow_filter,
+        fragment_scanner = ds.Scanner.from_fragment(
+            fragment=fragment,
+            schema=physical_schema,
+            filter=pyarrow_filter,
             columns=[col.name for col in file_project_schema.columns],
         )
 
+        if limit:
+            arrow_table = fragment_scanner.head(limit)
+            with rows_counter.get_lock():

Review Comment:
   I think we can remove this lock because we already did the expensive work. This will make the code a bit simpler and avoid locking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JonasJ-ap commented on a diff in pull request #7163: Python: Add limit parameter to table scan

Posted by "JonasJ-ap (via GitHub)" <gi...@apache.org>.
JonasJ-ap commented on code in PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#discussion_r1144239023


##########
python/pyiceberg/manifest.py:
##########
@@ -177,6 +177,12 @@ class DataFile(Record):
     sort_order_id: Optional[int]
     spec_id: Optional[int]
 
+    def __setattr__(self, name: str, value: Any) -> None:
+        # The file_format is written as a string, so we need to cast it to the Enum
+        if name == "file_format":
+            value = FileFormat[value]

Review Comment:
   Just curious. Is this related to changes in `pyarrow.py` or just an independent fix for parsing `file_format` in DataFile?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Polectron commented on pull request #7163: Python: Add limit parameter to table scan

Posted by "Polectron (via GitHub)" <gi...@apache.org>.
Polectron commented on PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#issuecomment-1479219168

   I don't really understand whats failing on the Python Integration test, it sometimes doesn't find some of the tables created by `provision.py`, maybe it's because in `python-integration.yml` the sleep after launching the docker container isn't long enough so the tests start running before the tables finish being created?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7163: Python: Add limit parameter to table scan

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#discussion_r1145283379


##########
python/Makefile:
##########
@@ -36,7 +36,7 @@ test-integration:
 	docker-compose -f dev/docker-compose-integration.yml kill
 	docker-compose -f dev/docker-compose-integration.yml build
 	docker-compose -f dev/docker-compose-integration.yml up -d
-	sleep 20
+	sleep 30

Review Comment:
   Works for now 👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on pull request #7163: Python: Add limit parameter to table scan

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#issuecomment-1480120800

   @Polectron thanks, this looks great! I have one final ask, could you update the docs under `python/mkdocs/docs/`? Otherwise, people won't use this awesome feature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7163: Python: Add limit parameter to table scan

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7163:
URL: https://github.com/apache/iceberg/pull/7163#discussion_r1146837310


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -517,15 +536,22 @@ def _file_to_table(
         if file_schema is None:
             raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}")
 
-        arrow_table = pq.read_table(
-            source=fout,
-            schema=parquet_schema,
-            pre_buffer=True,
-            buffer_size=8 * ONE_MEGABYTE,
-            filters=pyarrow_filter,
+        fragment_scanner = ds.Scanner.from_fragment(
+            fragment=fragment,
+            schema=physical_schema,
+            filter=pyarrow_filter,
             columns=[col.name for col in file_project_schema.columns],
         )
 
+        if limit:
+            arrow_table = fragment_scanner.head(limit)
+            with rows_counter.get_lock():

Review Comment:
   Thanks for clearing this up!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org