You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/12/01 16:11:19 UTC

[GitHub] [arrow-cookbook] lidavidm opened a new pull request #109: [Python] Add Flight streaming example

lidavidm opened a new pull request #109:
URL: https://github.com/apache/arrow-cookbook/pull/109


   Fixes #86.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-cookbook] amol- merged pull request #109: [Python] Add Flight streaming example

Posted by GitBox <gi...@apache.org>.
amol- merged pull request #109:
URL: https://github.com/apache/arrow-cookbook/pull/109


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-cookbook] amol- commented on pull request #109: [Python] Add Flight streaming example

Posted by GitBox <gi...@apache.org>.
amol- commented on pull request #109:
URL: https://github.com/apache/arrow-cookbook/pull/109#issuecomment-999447009


   What's the state of this recipe? Does it need further review or work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-cookbook] amol- commented on a change in pull request #109: [Python] Add Flight streaming example

Posted by GitBox <gi...@apache.org>.
amol- commented on a change in pull request #109:
URL: https://github.com/apache/arrow-cookbook/pull/109#discussion_r762841563



##########
File path: python/source/flight.rst
##########
@@ -226,3 +226,198 @@ we might list all parquet files that are currently stored by the server:
     # Shutdown the server
     server.shutdown()
 
+Streaming Parquet Storage Service
+=================================
+
+We can improve the Parquet storage service and avoid holding entire datasets in
+memory by streaming data. Flight readers and writers, like others in PyArrow,
+can be iterated through, so let's update the server from before to take
+advantage of this:
+
+.. testcode::
+
+   import pathlib
+
+   import numpy as np
+   import pyarrow as pa
+   import pyarrow.flight
+   import pyarrow.parquet
+
+
+   class FlightServer(pa.flight.FlightServerBase):
+
+       def __init__(self, location="grpc://0.0.0.0:8815",
+                   repo=pathlib.Path("./datasets"), **kwargs):
+           super(FlightServer, self).__init__(location, **kwargs)
+           self._location = location
+           self._repo = repo
+
+       def _make_flight_info(self, dataset):
+           dataset_path = self._repo / dataset
+           schema = pa.parquet.read_schema(dataset_path)
+           metadata = pa.parquet.read_metadata(dataset_path)
+           descriptor = pa.flight.FlightDescriptor.for_path(
+               dataset.encode('utf-8')
+           )
+           endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
+           return pyarrow.flight.FlightInfo(schema,
+                                           descriptor,
+                                           endpoints,
+                                           metadata.num_rows,
+                                           metadata.serialized_size)
+
+       def list_flights(self, context, criteria):
+           for dataset in self._repo.iterdir():
+               yield self._make_flight_info(dataset.name)
+
+       def get_flight_info(self, context, descriptor):
+           return self._make_flight_info(descriptor.path[0].decode('utf-8'))
+
+       def do_put(self, context, descriptor, reader, writer):
+           dataset = descriptor.path[0].decode('utf-8')
+           dataset_path = self._repo / dataset
+           # Read the uploaded data and write to Parquet incrementally
+           with dataset_path.open("wb") as sink:
+               with pa.parquet.ParquetWriter(sink, reader.schema) as writer:
+                   for chunk in reader:
+                       writer.write_table(pa.Table.from_batches([chunk.data]))
+
+       def do_get(self, context, ticket):
+           dataset = ticket.ticket.decode('utf-8')
+
+           # Stream data from a generator we implement
+           if dataset == ":random:":

Review comment:
       iI think that the `:random:` special dataset might be confusing for a reader in the context of this recipe. It's great for playing around, but the recipe goal was to read back datasets you stored, So a reader might wonder what's the purpose of that code block and get confused by it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-cookbook] lidavidm commented on pull request #109: [Python] Add Flight streaming example

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #109:
URL: https://github.com/apache/arrow-cookbook/pull/109#issuecomment-999560505


   Sorry for the delay - I did update it based on your feedback, so I think it's ready for another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-cookbook] lidavidm commented on pull request #109: [Python] Add Flight streaming example

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #109:
URL: https://github.com/apache/arrow-cookbook/pull/109#issuecomment-983797551


   <details>
   <summary>Rendered</summary>
   
   ![image](https://user-images.githubusercontent.com/327919/144270548-7b4eba3a-d0b0-4367-b705-d059a15b105a.png)
   
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-cookbook] lidavidm commented on a change in pull request #109: [Python] Add Flight streaming example

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #109:
URL: https://github.com/apache/arrow-cookbook/pull/109#discussion_r763002195



##########
File path: python/source/flight.rst
##########
@@ -226,3 +226,198 @@ we might list all parquet files that are currently stored by the server:
     # Shutdown the server
     server.shutdown()
 
+Streaming Parquet Storage Service
+=================================
+
+We can improve the Parquet storage service and avoid holding entire datasets in
+memory by streaming data. Flight readers and writers, like others in PyArrow,
+can be iterated through, so let's update the server from before to take
+advantage of this:
+
+.. testcode::
+
+   import pathlib
+
+   import numpy as np
+   import pyarrow as pa
+   import pyarrow.flight
+   import pyarrow.parquet
+
+
+   class FlightServer(pa.flight.FlightServerBase):
+
+       def __init__(self, location="grpc://0.0.0.0:8815",
+                   repo=pathlib.Path("./datasets"), **kwargs):
+           super(FlightServer, self).__init__(location, **kwargs)
+           self._location = location
+           self._repo = repo
+
+       def _make_flight_info(self, dataset):
+           dataset_path = self._repo / dataset
+           schema = pa.parquet.read_schema(dataset_path)
+           metadata = pa.parquet.read_metadata(dataset_path)
+           descriptor = pa.flight.FlightDescriptor.for_path(
+               dataset.encode('utf-8')
+           )
+           endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
+           return pyarrow.flight.FlightInfo(schema,
+                                           descriptor,
+                                           endpoints,
+                                           metadata.num_rows,
+                                           metadata.serialized_size)
+
+       def list_flights(self, context, criteria):
+           for dataset in self._repo.iterdir():
+               yield self._make_flight_info(dataset.name)
+
+       def get_flight_info(self, context, descriptor):
+           return self._make_flight_info(descriptor.path[0].decode('utf-8'))
+
+       def do_put(self, context, descriptor, reader, writer):
+           dataset = descriptor.path[0].decode('utf-8')
+           dataset_path = self._repo / dataset
+           # Read the uploaded data and write to Parquet incrementally
+           with dataset_path.open("wb") as sink:
+               with pa.parquet.ParquetWriter(sink, reader.schema) as writer:
+                   for chunk in reader:
+                       writer.write_table(pa.Table.from_batches([chunk.data]))
+
+       def do_get(self, context, ticket):
+           dataset = ticket.ticket.decode('utf-8')
+
+           # Stream data from a generator we implement
+           if dataset == ":random:":

Review comment:
       Makes sense. I wanted to demonstrate GeneratorStream, so instead I'll use it to replace RecordBatchStream and log a message or something.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-cookbook] lidavidm commented on a change in pull request #109: [Python] Add Flight streaming example

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #109:
URL: https://github.com/apache/arrow-cookbook/pull/109#discussion_r763158068



##########
File path: python/source/flight.rst
##########
@@ -226,3 +226,198 @@ we might list all parquet files that are currently stored by the server:
     # Shutdown the server
     server.shutdown()
 
+Streaming Parquet Storage Service
+=================================
+
+We can improve the Parquet storage service and avoid holding entire datasets in
+memory by streaming data. Flight readers and writers, like others in PyArrow,
+can be iterated through, so let's update the server from before to take
+advantage of this:
+
+.. testcode::
+
+   import pathlib
+
+   import numpy as np
+   import pyarrow as pa
+   import pyarrow.flight
+   import pyarrow.parquet
+
+
+   class FlightServer(pa.flight.FlightServerBase):
+
+       def __init__(self, location="grpc://0.0.0.0:8815",
+                   repo=pathlib.Path("./datasets"), **kwargs):
+           super(FlightServer, self).__init__(location, **kwargs)
+           self._location = location
+           self._repo = repo
+
+       def _make_flight_info(self, dataset):
+           dataset_path = self._repo / dataset
+           schema = pa.parquet.read_schema(dataset_path)
+           metadata = pa.parquet.read_metadata(dataset_path)
+           descriptor = pa.flight.FlightDescriptor.for_path(
+               dataset.encode('utf-8')
+           )
+           endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
+           return pyarrow.flight.FlightInfo(schema,
+                                           descriptor,
+                                           endpoints,
+                                           metadata.num_rows,
+                                           metadata.serialized_size)
+
+       def list_flights(self, context, criteria):
+           for dataset in self._repo.iterdir():
+               yield self._make_flight_info(dataset.name)
+
+       def get_flight_info(self, context, descriptor):
+           return self._make_flight_info(descriptor.path[0].decode('utf-8'))
+
+       def do_put(self, context, descriptor, reader, writer):
+           dataset = descriptor.path[0].decode('utf-8')
+           dataset_path = self._repo / dataset
+           # Read the uploaded data and write to Parquet incrementally
+           with dataset_path.open("wb") as sink:
+               with pa.parquet.ParquetWriter(sink, reader.schema) as writer:
+                   for chunk in reader:
+                       writer.write_table(pa.Table.from_batches([chunk.data]))
+
+       def do_get(self, context, ticket):
+           dataset = ticket.ticket.decode('utf-8')
+
+           # Stream data from a generator we implement
+           if dataset == ":random:":

Review comment:
       That makes sense. I've removed that part of the example.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-cookbook] amol- commented on a change in pull request #109: [Python] Add Flight streaming example

Posted by GitBox <gi...@apache.org>.
amol- commented on a change in pull request #109:
URL: https://github.com/apache/arrow-cookbook/pull/109#discussion_r763047700



##########
File path: python/source/flight.rst
##########
@@ -226,3 +226,198 @@ we might list all parquet files that are currently stored by the server:
     # Shutdown the server
     server.shutdown()
 
+Streaming Parquet Storage Service
+=================================
+
+We can improve the Parquet storage service and avoid holding entire datasets in
+memory by streaming data. Flight readers and writers, like others in PyArrow,
+can be iterated through, so let's update the server from before to take
+advantage of this:
+
+.. testcode::
+
+   import pathlib
+
+   import numpy as np
+   import pyarrow as pa
+   import pyarrow.flight
+   import pyarrow.parquet
+
+
+   class FlightServer(pa.flight.FlightServerBase):
+
+       def __init__(self, location="grpc://0.0.0.0:8815",
+                   repo=pathlib.Path("./datasets"), **kwargs):
+           super(FlightServer, self).__init__(location, **kwargs)
+           self._location = location
+           self._repo = repo
+
+       def _make_flight_info(self, dataset):
+           dataset_path = self._repo / dataset
+           schema = pa.parquet.read_schema(dataset_path)
+           metadata = pa.parquet.read_metadata(dataset_path)
+           descriptor = pa.flight.FlightDescriptor.for_path(
+               dataset.encode('utf-8')
+           )
+           endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
+           return pyarrow.flight.FlightInfo(schema,
+                                           descriptor,
+                                           endpoints,
+                                           metadata.num_rows,
+                                           metadata.serialized_size)
+
+       def list_flights(self, context, criteria):
+           for dataset in self._repo.iterdir():
+               yield self._make_flight_info(dataset.name)
+
+       def get_flight_info(self, context, descriptor):
+           return self._make_flight_info(descriptor.path[0].decode('utf-8'))
+
+       def do_put(self, context, descriptor, reader, writer):
+           dataset = descriptor.path[0].decode('utf-8')
+           dataset_path = self._repo / dataset
+           # Read the uploaded data and write to Parquet incrementally
+           with dataset_path.open("wb") as sink:
+               with pa.parquet.ParquetWriter(sink, reader.schema) as writer:
+                   for chunk in reader:
+                       writer.write_table(pa.Table.from_batches([chunk.data]))
+
+       def do_get(self, context, ticket):
+           dataset = ticket.ticket.decode('utf-8')
+
+           # Stream data from a generator we implement
+           if dataset == ":random:":

Review comment:
       I think that might be its own recipe, like "How to stream data generated on the fly". At that point it  could also be a smaller example with just `do_get` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-cookbook] lidavidm commented on pull request #109: [Python] Add Flight streaming example

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #109:
URL: https://github.com/apache/arrow-cookbook/pull/109#issuecomment-985727390


   CC @amol- if you have any comments here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org