You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/13 19:21:59 UTC

[GitHub] [arrow] lidavidm opened a new pull request #10318: ARROW-12050: [C++][Python][FlightRPC] Make Flight operations interruptible in Python

lidavidm opened a new pull request #10318:
URL: https://github.com/apache/arrow/pull/10318


   This uses a stop token to let interactive users interrupt a long-running Flight operation. It's not perfect: the operation won't be cancelled until the server delivers a message, so this doesn't protect against very slow servers. (In that case, we'd need some way for the stop source to call TryCancel() on the gRPC RPC object, which would be tricky.) But so long as the server is being responsive, this means Ctrl-C should do what people expect in Python.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10318: ARROW-12050: [C++][Python][FlightRPC] Make Flight operations interruptible in Python

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10318:
URL: https://github.com/apache/arrow/pull/10318#discussion_r644153441



##########
File path: cpp/src/arrow/flight/client.h
##########
@@ -129,6 +133,12 @@ class ARROW_FLIGHT_EXPORT FlightStreamReader : public MetadataRecordBatchReader
  public:
   /// \brief Try to cancel the call.
   virtual void Cancel() = 0;
+  using MetadataRecordBatchReader::ReadAll;
+  /// \brief Consume entire stream as a vector of record batches
+  virtual Status ReadAll(std::vector<std::shared_ptr<RecordBatch>>* batches,
+                         const StopToken& stop_token) = 0;
+  /// \brief Consume entire stream as a Table
+  virtual Status ReadAll(std::shared_ptr<Table>* table, const StopToken& stop_token) = 0;

Review comment:
       I made this non-virtual.

##########
File path: python/pyarrow/tests/test_flight.py
##########
@@ -1810,3 +1813,62 @@ def test_generic_options():
                                 generic_options=options)
         with pytest.raises(pa.ArrowInvalid):
             client.do_get(flight.Ticket(b'ints'))
+
+
+class CancelFlightServer(FlightServerBase):
+    """A server for testing StopToken."""
+
+    def do_get(self, context, ticket):
+        schema = pa.schema([])
+        rb = pa.RecordBatch.from_arrays([], schema=schema)
+        return flight.GeneratorStream(schema, itertools.repeat(rb))
+
+    def do_exchange(self, context, descriptor, reader, writer):
+        schema = pa.schema([])
+        rb = pa.RecordBatch.from_arrays([], schema=schema)
+        writer.begin(schema)
+        while not context.is_cancelled():
+            # TODO: writing schema.empty_table() here hangs/fails
+            writer.write_batch(rb)
+            time.sleep(0.5)
+
+
+def test_interrupt():
+    if threading.current_thread().ident != threading.main_thread().ident:
+        pytest.skip("test only works from main Python thread")
+    # Skips test if not available
+    raise_signal = util.get_raise_signal()
+
+    def signal_from_thread():
+        time.sleep(0.5)
+        raise_signal(signal.SIGINT)
+
+    exc_types = (KeyboardInterrupt, pa.ArrowCancelled)
+
+    def test(read_all):
+        try:
+            try:
+                t = threading.Thread(target=signal_from_thread)
+                with pytest.raises(exc_types) as exc_info:
+                    t.start()
+                    read_all()
+            finally:
+                t.join()
+        except KeyboardInterrupt:
+            # In case KeyboardInterrupt didn't interrupt read_all
+            # above, at least prevent it from stopping the test suite
+            # pytest.fail("KeyboardInterrupt didn't interrupt Flight read_all")
+            raise
+        e = exc_info.value.__context__
+        assert isinstance(e, pa.ArrowCancelled) or isinstance(
+            e, pa.ArrowCancelled)

Review comment:
       I meant for the second to be KeyboardInterrupt.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10318: ARROW-12050: [C++][Python][FlightRPC] Make Flight operations interruptible in Python

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10318:
URL: https://github.com/apache/arrow/pull/10318#issuecomment-840778565


   https://issues.apache.org/jira/browse/ARROW-12050


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on pull request #10318: ARROW-12050: [C++][Python][FlightRPC] Make Flight operations interruptible in Python

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on pull request #10318:
URL: https://github.com/apache/arrow/pull/10318#issuecomment-843705951


   Not quite sure of the issue addressed.
   Is it possible to reproduce the issue on my local host and verify the fix?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou edited a comment on pull request #10318: ARROW-12050: [C++][Python][FlightRPC] Make Flight operations interruptible in Python

Posted by GitBox <gi...@apache.org>.
pitrou edited a comment on pull request #10318:
URL: https://github.com/apache/arrow/pull/10318#issuecomment-853132348


   As a sidenote, I think we'll need to allow adding a callback on a StopToken, so that arbitrary callables such as gRPC's `TryCancel` can be called. I opened a JIRA for it: https://issues.apache.org/jira/browse/ARROW-12938


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10318: ARROW-12050: [C++][Python][FlightRPC] Make Flight operations interruptible in Python

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10318:
URL: https://github.com/apache/arrow/pull/10318#issuecomment-853132348


   As a sidenote, I think we'll need to allow adding a callback on a StopToken, so that arbitrary callables such as gRPC's `TryCancel` can be called. I'll open a JIRA for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou edited a comment on pull request #10318: ARROW-12050: [C++][Python][FlightRPC] Make Flight operations interruptible in Python

Posted by GitBox <gi...@apache.org>.
pitrou edited a comment on pull request #10318:
URL: https://github.com/apache/arrow/pull/10318#issuecomment-853132348


   As a sidenote, I think we'll need to allow adding a callback on a StopToken, so that arbitrary callables such as gRPC's `TryCancel` can be called. I'll open a JIRA for it: https://issues.apache.org/jira/browse/ARROW-12938


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou closed pull request #10318: ARROW-12050: [C++][Python][FlightRPC] Make Flight operations interruptible in Python

Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #10318:
URL: https://github.com/apache/arrow/pull/10318


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10318: ARROW-12050: [C++][Python][FlightRPC] Make Flight operations interruptible in Python

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10318:
URL: https://github.com/apache/arrow/pull/10318#discussion_r644150016



##########
File path: python/pyarrow/tests/test_flight.py
##########
@@ -1810,3 +1813,62 @@ def test_generic_options():
                                 generic_options=options)
         with pytest.raises(pa.ArrowInvalid):
             client.do_get(flight.Ticket(b'ints'))
+
+
+class CancelFlightServer(FlightServerBase):
+    """A server for testing StopToken."""
+
+    def do_get(self, context, ticket):
+        schema = pa.schema([])
+        rb = pa.RecordBatch.from_arrays([], schema=schema)
+        return flight.GeneratorStream(schema, itertools.repeat(rb))
+
+    def do_exchange(self, context, descriptor, reader, writer):
+        schema = pa.schema([])
+        rb = pa.RecordBatch.from_arrays([], schema=schema)
+        writer.begin(schema)
+        while not context.is_cancelled():
+            # TODO: writing schema.empty_table() here hangs/fails
+            writer.write_batch(rb)

Review comment:
       Ok, I was just wondering if the server could end up spending more time than necessary here :-)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10318: ARROW-12050: [C++][Python][FlightRPC] Make Flight operations interruptible in Python

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10318:
URL: https://github.com/apache/arrow/pull/10318#discussion_r644072551



##########
File path: cpp/src/arrow/flight/client.h
##########
@@ -129,6 +133,12 @@ class ARROW_FLIGHT_EXPORT FlightStreamReader : public MetadataRecordBatchReader
  public:
   /// \brief Try to cancel the call.
   virtual void Cancel() = 0;
+  using MetadataRecordBatchReader::ReadAll;
+  /// \brief Consume entire stream as a vector of record batches
+  virtual Status ReadAll(std::vector<std::shared_ptr<RecordBatch>>* batches,
+                         const StopToken& stop_token) = 0;
+  /// \brief Consume entire stream as a Table
+  virtual Status ReadAll(std::shared_ptr<Table>* table, const StopToken& stop_token) = 0;

Review comment:
       Is there a reason for the `Table` version to be virtual? It seems it can simply be implemented in the base class...?
   
   Of course, it doesn't matter much as long as we have only one implementation of this.

##########
File path: python/pyarrow/tests/test_flight.py
##########
@@ -1810,3 +1813,62 @@ def test_generic_options():
                                 generic_options=options)
         with pytest.raises(pa.ArrowInvalid):
             client.do_get(flight.Ticket(b'ints'))
+
+
+class CancelFlightServer(FlightServerBase):
+    """A server for testing StopToken."""
+
+    def do_get(self, context, ticket):
+        schema = pa.schema([])
+        rb = pa.RecordBatch.from_arrays([], schema=schema)
+        return flight.GeneratorStream(schema, itertools.repeat(rb))
+
+    def do_exchange(self, context, descriptor, reader, writer):
+        schema = pa.schema([])
+        rb = pa.RecordBatch.from_arrays([], schema=schema)
+        writer.begin(schema)
+        while not context.is_cancelled():
+            # TODO: writing schema.empty_table() here hangs/fails
+            writer.write_batch(rb)

Review comment:
       Would this raise an exception if the context is cancelled, or just succeed immediately?

##########
File path: python/pyarrow/tests/test_flight.py
##########
@@ -1810,3 +1813,62 @@ def test_generic_options():
                                 generic_options=options)
         with pytest.raises(pa.ArrowInvalid):
             client.do_get(flight.Ticket(b'ints'))
+
+
+class CancelFlightServer(FlightServerBase):
+    """A server for testing StopToken."""
+
+    def do_get(self, context, ticket):
+        schema = pa.schema([])
+        rb = pa.RecordBatch.from_arrays([], schema=schema)
+        return flight.GeneratorStream(schema, itertools.repeat(rb))
+
+    def do_exchange(self, context, descriptor, reader, writer):
+        schema = pa.schema([])
+        rb = pa.RecordBatch.from_arrays([], schema=schema)
+        writer.begin(schema)
+        while not context.is_cancelled():
+            # TODO: writing schema.empty_table() here hangs/fails
+            writer.write_batch(rb)
+            time.sleep(0.5)
+
+
+def test_interrupt():
+    if threading.current_thread().ident != threading.main_thread().ident:
+        pytest.skip("test only works from main Python thread")
+    # Skips test if not available
+    raise_signal = util.get_raise_signal()
+
+    def signal_from_thread():
+        time.sleep(0.5)
+        raise_signal(signal.SIGINT)
+
+    exc_types = (KeyboardInterrupt, pa.ArrowCancelled)
+
+    def test(read_all):
+        try:
+            try:
+                t = threading.Thread(target=signal_from_thread)
+                with pytest.raises(exc_types) as exc_info:
+                    t.start()
+                    read_all()
+            finally:
+                t.join()
+        except KeyboardInterrupt:
+            # In case KeyboardInterrupt didn't interrupt read_all
+            # above, at least prevent it from stopping the test suite
+            # pytest.fail("KeyboardInterrupt didn't interrupt Flight read_all")
+            raise
+        e = exc_info.value.__context__
+        assert isinstance(e, pa.ArrowCancelled) or isinstance(
+            e, pa.ArrowCancelled)

Review comment:
       Am I reading this wrong, or is this twice the same condition?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10318: ARROW-12050: [C++][Python][FlightRPC] Make Flight operations interruptible in Python

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10318:
URL: https://github.com/apache/arrow/pull/10318#discussion_r644146344



##########
File path: python/pyarrow/tests/test_flight.py
##########
@@ -1810,3 +1813,62 @@ def test_generic_options():
                                 generic_options=options)
         with pytest.raises(pa.ArrowInvalid):
             client.do_get(flight.Ticket(b'ints'))
+
+
+class CancelFlightServer(FlightServerBase):
+    """A server for testing StopToken."""
+
+    def do_get(self, context, ticket):
+        schema = pa.schema([])
+        rb = pa.RecordBatch.from_arrays([], schema=schema)
+        return flight.GeneratorStream(schema, itertools.repeat(rb))
+
+    def do_exchange(self, context, descriptor, reader, writer):
+        schema = pa.schema([])
+        rb = pa.RecordBatch.from_arrays([], schema=schema)
+        writer.begin(schema)
+        while not context.is_cancelled():
+            # TODO: writing schema.empty_table() here hangs/fails
+            writer.write_batch(rb)

Review comment:
       It should raise an exception, but either way, the client is gone at this point so there's not much the server can do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10318: ARROW-12050: [C++][Python][FlightRPC] Make Flight operations interruptible in Python

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10318:
URL: https://github.com/apache/arrow/pull/10318#issuecomment-844030751


   It solves a problem for Python users: if you Ctrl-C in something like read_all(), nothing will happen because control is in C++/Flight code, and then you're stuck waiting for the call to complete. 
   
   If you apply this diff to python/examples/flight/server.py:
   ```diff
   diff --git a/python/examples/flight/server.py b/python/examples/flight/server.py
   index 7a6b6697e..bc1df21d1 100644
   --- a/python/examples/flight/server.py
   +++ b/python/examples/flight/server.py
   @@ -73,6 +73,7 @@ class FlightServer(pyarrow.flight.FlightServerBase):
                yield self._make_flight_info(key, descriptor, table)
    
        def get_flight_info(self, context, descriptor):
   +        return self._make_flight_info("", descriptor, pyarrow.schema([]).empty_table())
            key = FlightServer.descriptor_to_key(descriptor)
            if key in self.flights:
                table = self.flights[key]
   @@ -86,10 +87,10 @@ class FlightServer(pyarrow.flight.FlightServerBase):
            print(self.flights[key])
    
        def do_get(self, context, ticket):
   -        key = ast.literal_eval(ticket.ticket.decode())
   -        if key not in self.flights:
   -            return None
   -        return pyarrow.flight.RecordBatchStream(self.flights[key])
   +        import itertools
   +        schema = pyarrow.schema([])
   +        rb = pyarrow.RecordBatch.from_arrays([], schema=schema)
   +        return pyarrow.flight.GeneratorStream(schema, itertools.repeat(rb))
    
        def list_actions(self, context):
            return [
   ```
   
   Then you can start the server:
   
   ```sh
   arrow/python$ env PYTHONPATH=$(pwd) python examples/flight/server.py --port 2000 &
   arrow/python$ env PYTHONPATH=(pwd) python examples/flight/client.py get localhost:2000 -c foo
   ```
   
   Without this patch, if you Ctrl-C, your client will still be stuck forever. With this patch you'll interrupt the call as expected.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org