You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/18 05:11:23 UTC

[GitHub] [arrow] edponce commented on a change in pull request #12659: ARROW-15909: [Python] Log uncaught exceptions in Flight RPC handlers

edponce commented on a change in pull request #12659:
URL: https://github.com/apache/arrow/pull/12659#discussion_r829694665



##########
File path: python/pyarrow/_flight.pyx
##########
@@ -1490,20 +1491,47 @@ cdef class RecordBatchStream(FlightDataStream):
         self.data_source = data_source
         self.write_options = _get_options(options).c_options
 
-    cdef CFlightDataStream* to_stream(self) except *:
+    cdef CFlightDataStream* to_stream(self, logger) except *:
         cdef:
             shared_ptr[CRecordBatchReader] reader
         if isinstance(self.data_source, RecordBatchReader):
             reader = (<RecordBatchReader> self.data_source).reader
         elif isinstance(self.data_source, lib.Table):
-            table = (<Table> self.data_source).table
-            reader.reset(new TableBatchReader(deref(table)))
+            reader.reset(new TableBatchReader(
+                (<Table> self.data_source).sp_table))
         else:
             raise RuntimeError("Can't construct RecordBatchStream "
                                "from type {}".format(type(self.data_source)))
         return new CRecordBatchStream(reader, self.write_options)
 
 
+class _WithLogger(collections.namedtuple(
+        '_WithLogger', ['this', 'logger'])):
+    """Combine a server object with additional logging state.
+
+    This avoids exposing the logger in the public API (by storing it
+    on self, which even a name-mangled method can't avoid) and
+    potentially clashing with application code.
+    """
+
+    def log_exception(self, method, exc):
+        """Log an uncaught exception in the handler for an RPC method."""
+        if not self.logger:
+            return
+        # Don't log all kinds of errors. FlightError is explicitly for
+        # sending RPC errors; NotImplementedError is likely noise.
+        # Other errors are unclear, so log to be safe. For instance:
+        # both ValueError and ArrowInvalid map to INVALID_ARGUMENT,
+        # but ValueError is likely a stray exception (so log it) and
+        # ArrowInvalid is likely intentional (so don't)

Review comment:
       Extreme nit: `... (so don't).`  <-- period?

##########
File path: python/pyarrow/_flight.pyx
##########
@@ -1707,16 +1735,26 @@ cdef class ClientAuthSender(_Weakrefable):
         return result
 
 
-cdef CStatus _data_stream_next(void* self, CFlightPayload* payload) except *:
+cdef CStatus _data_stream_next(void* c_state,
+                               CFlightPayload* payload) except *:
     """Callback for implementing FlightDataStream in Python."""
+    server_state = <object> c_state
+    try:
+        if not isinstance(server_state.this, GeneratorStream):
+            raise RuntimeError(
+                "self object in callback is not GeneratorStream")
+        stream = <GeneratorStream> server_state.this
+        return _data_stream_do_next(stream, server_state, payload)
+    except Exception as e:
+        server_state.log_exception("do_get", e)

Review comment:
       Is this a "do_get" operation?
   [In `_do_get`](https://github.com/apache/arrow/pull/12659/files#diff-0ed358f5d42920d7f94cc500791976a2c158c4d72f4a6b231393534b2d13683bR1957), name is also "do_get".

##########
File path: python/pyarrow/_flight.pyx
##########
@@ -1866,41 +1919,49 @@ cdef CStatus _do_put(void* self, const CServerCallContext& context,
         FlightDescriptor descriptor = \
             FlightDescriptor.__new__(FlightDescriptor)
 
+    server_state = <object> c_state
     descriptor.descriptor = reader.get().descriptor()
     py_reader.reader.reset(reader.release())
     py_writer.writer.reset(writer.release())
     try:
-        (<object> self).do_put(ServerCallContext.wrap(context), descriptor,
-                               py_reader, py_writer)
+        server_state.this.do_put(ServerCallContext.wrap(context), descriptor,
+                                 py_reader, py_writer)
         return CStatus_OK()

Review comment:
       Nit: The general convention seems to be to place `return CStatus_OK()` at end of function.

##########
File path: python/pyarrow/_flight.pyx
##########
@@ -1910,27 +1971,32 @@ cdef CStatus _do_exchange(void* self, const CServerCallContext& context,
         FlightDescriptor descriptor = \
             FlightDescriptor.__new__(FlightDescriptor)
 
+    server_state = <object> c_state
     descriptor.descriptor = reader.get().descriptor()
     py_reader.reader.reset(reader.release())
     py_writer.writer.reset(writer.release())
     try:
-        (<object> self).do_exchange(ServerCallContext.wrap(context),
-                                    descriptor, py_reader, py_writer)
+        server_state.this.do_exchange(ServerCallContext.wrap(context),
+                                      descriptor, py_reader, py_writer)
         return CStatus_OK()

Review comment:
       Another inlined return statement.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org