You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "westonpace (via GitHub)" <gi...@apache.org> on 2023/04/10 18:21:20 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #34972: GH-34971: [Format] Enhance C-Data API to support non-cpu cases

westonpace commented on code in PR #34972:
URL: https://github.com/apache/arrow/pull/34972#discussion_r1161955047


##########
cpp/src/arrow/c/abi.h:
##########
@@ -106,6 +212,98 @@ struct ArrowArrayStream {
 
 #endif  // ARROW_C_STREAM_INTERFACE
 
+#ifndef ARROW_C_DEVICE_STREAM_INTERFACE
+#define ARROW_C_DEVICE_STREAM_INTERFACE
+
+/// \brief Equivalent to ArrowArrayStream, but for ArrowDeviceArrays.
+///
+/// This stream is intended to provide a stream of data on a single
+/// device, if a producer wants data to be produced on multiple devices
+/// then multiple streams should be provided. One per device.
+struct ArrowDeviceArrayStream {
+  /// \brief The device that this stream produces data on.
+  ///
+  /// All ArrowDeviceArrays that are produced by this
+  /// stream should have the same device_type as set
+  /// here. The device_type needs to be provided here
+  /// so that consumers can provide the correct type
+  /// of queue_ptr when calling get_next.
+  ArrowDeviceType device_type;
+
+  /// \brief Callback to get the stream schema
+  /// (will be the same for all arrays in the stream).
+  ///
+  /// If successful, the ArrowSchema must be released independantly from the stream.
+  /// The schema should be accessible via CPU memory.
+  ///
+  /// \param[in] self The ArrowDeviceArrayStream object itself
+  /// \param[out] out C struct to export the schema to
+  /// \return 0 if successful, an `errno`-compatible error code otherwise.
+  int (*get_schema)(struct ArrowDeviceArrayStream* self, struct ArrowSchema* out);
+
+  /// \brief Callback to get the device id for the next array.
+  ///
+  /// This is necessary so that the proper/correct stream pointer can be provided
+  /// to get_next.
+  ///
+  /// The next call to `get_next` should provide an ArrowDeviceArray whose
+  /// device_id matches what is provided here, and whose device_type is the
+  /// same as the device_type member of this stream.

Review Comment:
   I'm not certain I follow.  Isn't the `ArrowDeviceArray` passed to `get_next` an "out" parameter?  Are you saying that the `ArrowDeviceArray` struct itself (not the buffers) needs to be allocated on the device?



##########
cpp/src/arrow/c/abi.h:
##########
@@ -106,6 +212,98 @@ struct ArrowArrayStream {
 
 #endif  // ARROW_C_STREAM_INTERFACE
 
+#ifndef ARROW_C_DEVICE_STREAM_INTERFACE
+#define ARROW_C_DEVICE_STREAM_INTERFACE
+
+/// \brief Equivalent to ArrowArrayStream, but for ArrowDeviceArrays.
+///
+/// This stream is intended to provide a stream of data on a single
+/// device, if a producer wants data to be produced on multiple devices
+/// then multiple streams should be provided. One per device.
+struct ArrowDeviceArrayStream {
+  /// \brief The device that this stream produces data on.
+  ///
+  /// All ArrowDeviceArrays that are produced by this
+  /// stream should have the same device_type as set
+  /// here. The device_type needs to be provided here
+  /// so that consumers can provide the correct type
+  /// of queue_ptr when calling get_next.
+  ArrowDeviceType device_type;
+
+  /// \brief Callback to get the stream schema
+  /// (will be the same for all arrays in the stream).
+  ///
+  /// If successful, the ArrowSchema must be released independantly from the stream.
+  /// The schema should be accessible via CPU memory.
+  ///
+  /// \param[in] self The ArrowDeviceArrayStream object itself
+  /// \param[out] out C struct to export the schema to
+  /// \return 0 if successful, an `errno`-compatible error code otherwise.
+  int (*get_schema)(struct ArrowDeviceArrayStream* self, struct ArrowSchema* out);
+
+  /// \brief Callback to get the device id for the next array.
+  ///
+  /// This is necessary so that the proper/correct stream pointer can be provided
+  /// to get_next.
+  ///
+  /// The next call to `get_next` should provide an ArrowDeviceArray whose
+  /// device_id matches what is provided here, and whose device_type is the
+  /// same as the device_type member of this stream.
+  ///
+  /// \param[in] self The ArrowDeviceArrayStream object itself
+  /// \param[out] out_device_id Pointer to be populated with the device id, must not be
+  /// null \return 0 if successful, an `errno`-compatible error code otherwise.
+  int (*get_next_device_id)(struct ArrowDeviceArrayStream* self, int64_t* out_device_id);
+
+  /// \brief Callback to get the next array
+  ///
+  /// If there is no error and the returned array has been released, the stream
+  /// has ended. If successful, the ArrowArray must be released independently
+  /// from the stream.
+  ///
+  /// Because different frameworks use different types to represent this, we
+  /// accept a void* which should then be reinterpreted into whatever the
+  /// appropriate type is (e.g. cudaStream_t) for use by the producer.
+  ///
+  /// \param[in] self The ArrowDeviceArrayStream object itself
+  /// \param[in] queue_ptr The appropriate queue, stream, or
+  /// equivalent object for the device that the data is allocated on
+  /// to indicate where the consumer wants the data to be accessible.
+  /// If queue_ptr is NULL then the default stream (e.g. CUDA stream 0)
+  /// should be used to ensure that the memory is accessible from any stream.

Review Comment:
   I'm a little confused here.  It sounds like I need to call `get_next_device_id` to determine which queue to use and then I need to pass that queue on to the call to `get_next`.  But why?  Why isn't the producer managing the queues?
   
   If the producer controls which device id gets used (`get_next_device_id` seems to suggest this) then why does the consumer need to give it the queue?  For example, if I were merging streams from two different devices it seems like I would do something like (apologies for the butchered pseudo-code)...
   
   ```
   // Dummy class merging two infinite streams in an inefficient round-robin fashion
   class MergedStream {
   
     int get_next(ArrowDeviceArray* out) {
       if (merged_arrays_.empty()) {
         ArrowDeviceArray arr;
         left_.get_next(&arr);
         merged_arrays_.push(arr);
         right_.get_next(&arr);
         merged_arrays_.push(arr);
       }
       *out = merged_arrays_.pop();
     }
   };
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org