You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/21 17:12:12 UTC

[GitHub] [arrow] pitrou opened a new pull request #10125: [C++] WIP: C push producer interface

pitrou opened a new pull request #10125:
URL: https://github.com/apache/arrow/pull/10125


   The C push producer interface is meant to allow a push-based data transfer model between producers and consumers.
   
   It should allow multi-threaded data push, since non-trivial data production methods can often benefit from parallel execution.
   
   It should allow for backpressure, where the consumer can tell the producer to pause or resume production as desired.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#discussion_r617746272



##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface
+
+// Consumer-provided callbacks for push producers
+//
+// Rules:
+// - any of these callbacks may be called concurrently from multiple threads
+// - any of these callbacks may call back into
+//   `ArrowArrayProducer::pause_producing`, `ArrowArrayProducer::resume_producing`
+//   or `ArrowArrayProducer::stop_producing`
+//   (but *not* into `ArrowArrayProducer::release`).
+struct ArrowArrayReceiver {
+  // The producer calls this callback to push a data item after it
+  // has filled the pointer-passed ArrowArray struct.
+  //
+  // The consumer *must* move the ArrowArray struct contents before this
+  // callback returns, as the producer is free to release it immediately
+  // afterwards.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_array)(struct ArrowArrayReceiver*, struct ArrowArray* out);
+
+  // The producer calls this callback to signal an error occurred while
+  // producing data.
+  //
+  // `error` is a non-zero `errno`-compatible error code.
+  //
+  // `message` is an optional null-terminated character array describing
+  // the error, or NULL if no description is available.  The `message`
+  // pointer is only valid until this callback returns, therefore the
+  // consumer must copy its contents if it wants to store the error message.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_error)(struct ArrowArrayReceiver*, int error, const char* message);
+
+  // Opaque consumer-specific data.
+  //
+  // This is meant to help the consumer associate calls to the above
+  // callbacks to its internal structures.  If such resources were
+  // dynamically allocated, they should only be released by the consumer
+  // after `ArrowArrayProducer::release` has been called and has returned.
+  void* private_data;
+};
+
+// Push-based array producer
+struct ArrowArrayProducer {
+  // Callback to get the produced data type
+  // (will be the same for all pushed arrays).
+  //
+  // The ArrowSchema must be released independently from the ArrowArrayProducer

Review comment:
       "Must be released" means "must have its release callback called by the consumer".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#discussion_r617745939



##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface
+
+// Consumer-provided callbacks for push producers
+//
+// Rules:
+// - any of these callbacks may be called concurrently from multiple threads
+// - any of these callbacks may call back into
+//   `ArrowArrayProducer::pause_producing`, `ArrowArrayProducer::resume_producing`
+//   or `ArrowArrayProducer::stop_producing`
+//   (but *not* into `ArrowArrayProducer::release`).
+struct ArrowArrayReceiver {
+  // The producer calls this callback to push a data item after it
+  // has filled the pointer-passed ArrowArray struct.
+  //
+  // The consumer *must* move the ArrowArray struct contents before this
+  // callback returns, as the producer is free to release it immediately
+  // afterwards.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_array)(struct ArrowArrayReceiver*, struct ArrowArray* out);
+
+  // The producer calls this callback to signal an error occurred while
+  // producing data.
+  //
+  // `error` is a non-zero `errno`-compatible error code.
+  //
+  // `message` is an optional null-terminated character array describing
+  // the error, or NULL if no description is available.  The `message`
+  // pointer is only valid until this callback returns, therefore the
+  // consumer must copy its contents if it wants to store the error message.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_error)(struct ArrowArrayReceiver*, int error, const char* message);
+
+  // Opaque consumer-specific data.
+  //
+  // This is meant to help the consumer associate calls to the above
+  // callbacks to its internal structures.  If such resources were
+  // dynamically allocated, they should only be released by the consumer
+  // after `ArrowArrayProducer::release` has been called and has returned.
+  void* private_data;
+};
+
+// Push-based array producer
+struct ArrowArrayProducer {
+  // Callback to get the produced data type
+  // (will be the same for all pushed arrays).
+  //
+  // The ArrowSchema must be released independently from the ArrowArrayProducer
+  //
+  // XXX add error return?
+  void (*get_schema)(struct ArrowArrayProducer*, struct ArrowSchema* out);

Review comment:
       Once before `start_producing`. I'm not sure this is necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#discussion_r617745586



##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface
+
+// Consumer-provided callbacks for push producers
+//
+// Rules:
+// - any of these callbacks may be called concurrently from multiple threads
+// - any of these callbacks may call back into
+//   `ArrowArrayProducer::pause_producing`, `ArrowArrayProducer::resume_producing`
+//   or `ArrowArrayProducer::stop_producing`
+//   (but *not* into `ArrowArrayProducer::release`).
+struct ArrowArrayReceiver {
+  // The producer calls this callback to push a data item after it
+  // has filled the pointer-passed ArrowArray struct.
+  //
+  // The consumer *must* move the ArrowArray struct contents before this
+  // callback returns, as the producer is free to release it immediately
+  // afterwards.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_array)(struct ArrowArrayReceiver*, struct ArrowArray* out);
+
+  // The producer calls this callback to signal an error occurred while
+  // producing data.
+  //
+  // `error` is a non-zero `errno`-compatible error code.
+  //
+  // `message` is an optional null-terminated character array describing
+  // the error, or NULL if no description is available.  The `message`
+  // pointer is only valid until this callback returns, therefore the
+  // consumer must copy its contents if it wants to store the error message.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_error)(struct ArrowArrayReceiver*, int error, const char* message);
+
+  // Opaque consumer-specific data.
+  //
+  // This is meant to help the consumer associate calls to the above
+  // callbacks to its internal structures.  If such resources were
+  // dynamically allocated, they should only be released by the consumer
+  // after `ArrowArrayProducer::release` has been called and has returned.
+  void* private_data;
+};
+
+// Push-based array producer
+struct ArrowArrayProducer {
+  // Callback to get the produced data type
+  // (will be the same for all pushed arrays).
+  //
+  // The ArrowSchema must be released independently from the ArrowArrayProducer
+  //
+  // XXX add error return?
+  void (*get_schema)(struct ArrowArrayProducer*, struct ArrowSchema* out);
+
+  // Callback to start producing data
+  //
+  // This function should be called once by the consumer.
+  // It tells the producer that the consumer is ready to be called
+  // on the ArrowArrayReceiver callbacks.
+  //
+  // The ArrowArrayReceiver callbacks may be called *before* this function
+  // returns.  Also, each of the receiver callbacks may be called concurrently,
+  // from multiple threads.
+  void (*start_producing)(struct ArrowArrayProducer*, struct ArrowArrayReceiver*);
+
+  // Callback to temporarily pause producing data
+  //
+  // The consumer can use this function to apply backpressure when it is
+  // not ready to receive more data.  However, the producer may still push
+  // data after this function is called (especially if the producer is
+  // multi-threaded, as ensuring serialization may not be convenient).
+  void (*pause_producing)(struct ArrowArrayProducer*);
+
+  // Callback to resume producing data after a pause
+  //
+  // The consumer can use this function to release backpressure when it is
+  // ready to receive data again.  This must only be called after a
+  // call to `pause_producing`.
+  //
+  // Given that `pause_producing` and `resume_producing` may be called
+  // concurrently, it is recommended to implement their functionality
+  // using a counter or semaphore.
+  void (*resume_producing)(struct ArrowArrayProducer*);
+
+  // Callback to start producing data
+  //
+  // This function should be called once by the consumer, and only
+  // after `start_producing` was called. After this call, the producer
+  // should stop pushing data. However, with multi-threaded producers,
+  // it is still possible for the receiver callbacks to be called after
+  // `stop_producing` has been called.
+  //
+  // This function must be implemented idempotently: calling it a second time
+  // (including concurrently) is a no-op.
+  void (*stop_producing)(struct ArrowArrayProducer*);
+
+  // Release callback: release the push producer's own resources.

Review comment:
       Hmm, I had forgotten about EOF :-)
   The release callback is called by the consumer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#discussion_r619881137



##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface
+
+// Consumer-provided callbacks for push producers
+//
+// Rules:
+// - any of these callbacks may be called concurrently from multiple threads
+// - any of these callbacks may call back into
+//   `ArrowArrayProducer::pause_producing`, `ArrowArrayProducer::resume_producing`
+//   or `ArrowArrayProducer::stop_producing`
+//   (but *not* into `ArrowArrayProducer::release`).
+struct ArrowArrayReceiver {
+  // The producer calls this callback to push a data item after it
+  // has filled the pointer-passed ArrowArray struct.
+  //
+  // The consumer *must* move the ArrowArray struct contents before this
+  // callback returns, as the producer is free to release it immediately
+  // afterwards.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_array)(struct ArrowArrayReceiver*, struct ArrowArray* out);

Review comment:
       Would it be sufficient for this method to block on calls for back-pressure?  Are there undesirable side effects of doing this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#discussion_r617741363



##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface
+
+// Consumer-provided callbacks for push producers
+//
+// Rules:
+// - any of these callbacks may be called concurrently from multiple threads
+// - any of these callbacks may call back into
+//   `ArrowArrayProducer::pause_producing`, `ArrowArrayProducer::resume_producing`
+//   or `ArrowArrayProducer::stop_producing`
+//   (but *not* into `ArrowArrayProducer::release`).
+struct ArrowArrayReceiver {
+  // The producer calls this callback to push a data item after it
+  // has filled the pointer-passed ArrowArray struct.
+  //
+  // The consumer *must* move the ArrowArray struct contents before this
+  // callback returns, as the producer is free to release it immediately
+  // afterwards.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_array)(struct ArrowArrayReceiver*, struct ArrowArray* out);
+
+  // The producer calls this callback to signal an error occurred while
+  // producing data.
+  //
+  // `error` is a non-zero `errno`-compatible error code.
+  //
+  // `message` is an optional null-terminated character array describing
+  // the error, or NULL if no description is available.  The `message`
+  // pointer is only valid until this callback returns, therefore the
+  // consumer must copy its contents if it wants to store the error message.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_error)(struct ArrowArrayReceiver*, int error, const char* message);
+
+  // Opaque consumer-specific data.
+  //
+  // This is meant to help the consumer associate calls to the above
+  // callbacks to its internal structures.  If such resources were
+  // dynamically allocated, they should only be released by the consumer
+  // after `ArrowArrayProducer::release` has been called and has returned.
+  void* private_data;
+};
+
+// Push-based array producer
+struct ArrowArrayProducer {
+  // Callback to get the produced data type
+  // (will be the same for all pushed arrays).
+  //
+  // The ArrowSchema must be released independently from the ArrowArrayProducer
+  //
+  // XXX add error return?
+  void (*get_schema)(struct ArrowArrayProducer*, struct ArrowSchema* out);
+
+  // Callback to start producing data
+  //
+  // This function should be called once by the consumer.
+  // It tells the producer that the consumer is ready to be called
+  // on the ArrowArrayReceiver callbacks.
+  //
+  // The ArrowArrayReceiver callbacks may be called *before* this function
+  // returns.  Also, each of the receiver callbacks may be called concurrently,
+  // from multiple threads.
+  void (*start_producing)(struct ArrowArrayProducer*, struct ArrowArrayReceiver*);
+
+  // Callback to temporarily pause producing data
+  //
+  // The consumer can use this function to apply backpressure when it is
+  // not ready to receive more data.  However, the producer may still push
+  // data after this function is called (especially if the producer is
+  // multi-threaded, as ensuring serialization may not be convenient).
+  void (*pause_producing)(struct ArrowArrayProducer*);
+
+  // Callback to resume producing data after a pause
+  //
+  // The consumer can use this function to release backpressure when it is
+  // ready to receive data again.  This must only be called after a
+  // call to `pause_producing`.
+  //
+  // Given that `pause_producing` and `resume_producing` may be called
+  // concurrently, it is recommended to implement their functionality
+  // using a counter or semaphore.
+  void (*resume_producing)(struct ArrowArrayProducer*);
+
+  // Callback to start producing data
+  //
+  // This function should be called once by the consumer, and only
+  // after `start_producing` was called. After this call, the producer
+  // should stop pushing data. However, with multi-threaded producers,
+  // it is still possible for the receiver callbacks to be called after
+  // `stop_producing` has been called.
+  //
+  // This function must be implemented idempotently: calling it a second time
+  // (including concurrently) is a no-op.
+  void (*stop_producing)(struct ArrowArrayProducer*);
+
+  // Release callback: release the push producer's own resources.

Review comment:
       Is the intent for this to be called by the producer when it reaches EOF?  If not, how does the producer signal it is finished?

##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface
+
+// Consumer-provided callbacks for push producers
+//
+// Rules:
+// - any of these callbacks may be called concurrently from multiple threads
+// - any of these callbacks may call back into
+//   `ArrowArrayProducer::pause_producing`, `ArrowArrayProducer::resume_producing`
+//   or `ArrowArrayProducer::stop_producing`
+//   (but *not* into `ArrowArrayProducer::release`).
+struct ArrowArrayReceiver {
+  // The producer calls this callback to push a data item after it
+  // has filled the pointer-passed ArrowArray struct.
+  //
+  // The consumer *must* move the ArrowArray struct contents before this
+  // callback returns, as the producer is free to release it immediately
+  // afterwards.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_array)(struct ArrowArrayReceiver*, struct ArrowArray* out);
+
+  // The producer calls this callback to signal an error occurred while
+  // producing data.
+  //
+  // `error` is a non-zero `errno`-compatible error code.
+  //
+  // `message` is an optional null-terminated character array describing
+  // the error, or NULL if no description is available.  The `message`
+  // pointer is only valid until this callback returns, therefore the
+  // consumer must copy its contents if it wants to store the error message.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_error)(struct ArrowArrayReceiver*, int error, const char* message);
+
+  // Opaque consumer-specific data.
+  //
+  // This is meant to help the consumer associate calls to the above
+  // callbacks to its internal structures.  If such resources were
+  // dynamically allocated, they should only be released by the consumer
+  // after `ArrowArrayProducer::release` has been called and has returned.
+  void* private_data;
+};
+
+// Push-based array producer
+struct ArrowArrayProducer {
+  // Callback to get the produced data type
+  // (will be the same for all pushed arrays).
+  //
+  // The ArrowSchema must be released independently from the ArrowArrayProducer
+  //
+  // XXX add error return?
+  void (*get_schema)(struct ArrowArrayProducer*, struct ArrowSchema* out);
+
+  // Callback to start producing data
+  //
+  // This function should be called once by the consumer.
+  // It tells the producer that the consumer is ready to be called
+  // on the ArrowArrayReceiver callbacks.
+  //
+  // The ArrowArrayReceiver callbacks may be called *before* this function
+  // returns.  Also, each of the receiver callbacks may be called concurrently,
+  // from multiple threads.
+  void (*start_producing)(struct ArrowArrayProducer*, struct ArrowArrayReceiver*);
+
+  // Callback to temporarily pause producing data
+  //
+  // The consumer can use this function to apply backpressure when it is
+  // not ready to receive more data.  However, the producer may still push
+  // data after this function is called (especially if the producer is
+  // multi-threaded, as ensuring serialization may not be convenient).
+  void (*pause_producing)(struct ArrowArrayProducer*);
+
+  // Callback to resume producing data after a pause
+  //
+  // The consumer can use this function to release backpressure when it is
+  // ready to receive data again.  This must only be called after a
+  // call to `pause_producing`.
+  //
+  // Given that `pause_producing` and `resume_producing` may be called
+  // concurrently, it is recommended to implement their functionality
+  // using a counter or semaphore.
+  void (*resume_producing)(struct ArrowArrayProducer*);
+
+  // Callback to start producing data

Review comment:
       `start` -> `stop`

##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface
+
+// Consumer-provided callbacks for push producers
+//
+// Rules:
+// - any of these callbacks may be called concurrently from multiple threads
+// - any of these callbacks may call back into
+//   `ArrowArrayProducer::pause_producing`, `ArrowArrayProducer::resume_producing`
+//   or `ArrowArrayProducer::stop_producing`
+//   (but *not* into `ArrowArrayProducer::release`).
+struct ArrowArrayReceiver {
+  // The producer calls this callback to push a data item after it
+  // has filled the pointer-passed ArrowArray struct.
+  //
+  // The consumer *must* move the ArrowArray struct contents before this
+  // callback returns, as the producer is free to release it immediately
+  // afterwards.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_array)(struct ArrowArrayReceiver*, struct ArrowArray* out);
+
+  // The producer calls this callback to signal an error occurred while
+  // producing data.
+  //
+  // `error` is a non-zero `errno`-compatible error code.
+  //
+  // `message` is an optional null-terminated character array describing
+  // the error, or NULL if no description is available.  The `message`
+  // pointer is only valid until this callback returns, therefore the
+  // consumer must copy its contents if it wants to store the error message.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_error)(struct ArrowArrayReceiver*, int error, const char* message);
+
+  // Opaque consumer-specific data.
+  //
+  // This is meant to help the consumer associate calls to the above
+  // callbacks to its internal structures.  If such resources were
+  // dynamically allocated, they should only be released by the consumer
+  // after `ArrowArrayProducer::release` has been called and has returned.
+  void* private_data;
+};
+
+// Push-based array producer
+struct ArrowArrayProducer {
+  // Callback to get the produced data type
+  // (will be the same for all pushed arrays).
+  //
+  // The ArrowSchema must be released independently from the ArrowArrayProducer

Review comment:
       This sentence is a little confusing.  Maybe `The ArrowSchema will be released independently` or `The ArrowArrayProducer must allow this `ArrowSchema` to be released independently`?  Or maybe I am misunderstanding.

##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface
+
+// Consumer-provided callbacks for push producers
+//
+// Rules:
+// - any of these callbacks may be called concurrently from multiple threads
+// - any of these callbacks may call back into
+//   `ArrowArrayProducer::pause_producing`, `ArrowArrayProducer::resume_producing`
+//   or `ArrowArrayProducer::stop_producing`
+//   (but *not* into `ArrowArrayProducer::release`).
+struct ArrowArrayReceiver {
+  // The producer calls this callback to push a data item after it
+  // has filled the pointer-passed ArrowArray struct.
+  //
+  // The consumer *must* move the ArrowArray struct contents before this
+  // callback returns, as the producer is free to release it immediately
+  // afterwards.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_array)(struct ArrowArrayReceiver*, struct ArrowArray* out);
+
+  // The producer calls this callback to signal an error occurred while
+  // producing data.
+  //
+  // `error` is a non-zero `errno`-compatible error code.
+  //
+  // `message` is an optional null-terminated character array describing
+  // the error, or NULL if no description is available.  The `message`
+  // pointer is only valid until this callback returns, therefore the
+  // consumer must copy its contents if it wants to store the error message.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_error)(struct ArrowArrayReceiver*, int error, const char* message);
+
+  // Opaque consumer-specific data.
+  //
+  // This is meant to help the consumer associate calls to the above
+  // callbacks to its internal structures.  If such resources were
+  // dynamically allocated, they should only be released by the consumer
+  // after `ArrowArrayProducer::release` has been called and has returned.
+  void* private_data;
+};
+
+// Push-based array producer
+struct ArrowArrayProducer {
+  // Callback to get the produced data type
+  // (will be the same for all pushed arrays).
+  //
+  // The ArrowSchema must be released independently from the ArrowArrayProducer
+  //
+  // XXX add error return?
+  void (*get_schema)(struct ArrowArrayProducer*, struct ArrowSchema* out);

Review comment:
       How does this fit into the lifecycle?  Will it be called once before `start_producing`?  Or can it be called periodically throughout?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#discussion_r621426996



##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface
+
+// Consumer-provided callbacks for push producers
+//
+// Rules:
+// - any of these callbacks may be called concurrently from multiple threads
+// - any of these callbacks may call back into
+//   `ArrowArrayProducer::pause_producing`, `ArrowArrayProducer::resume_producing`
+//   or `ArrowArrayProducer::stop_producing`
+//   (but *not* into `ArrowArrayProducer::release`).
+struct ArrowArrayReceiver {
+  // The producer calls this callback to push a data item after it
+  // has filled the pointer-passed ArrowArray struct.
+  //
+  // The consumer *must* move the ArrowArray struct contents before this
+  // callback returns, as the producer is free to release it immediately
+  // afterwards.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_array)(struct ArrowArrayReceiver*, struct ArrowArray* out);
+
+  // The producer calls this callback to signal an error occurred while
+  // producing data.
+  //
+  // `error` is a non-zero `errno`-compatible error code.
+  //
+  // `message` is an optional null-terminated character array describing
+  // the error, or NULL if no description is available.  The `message`
+  // pointer is only valid until this callback returns, therefore the
+  // consumer must copy its contents if it wants to store the error message.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_error)(struct ArrowArrayReceiver*, int error, const char* message);
+
+  // Opaque consumer-specific data.
+  //
+  // This is meant to help the consumer associate calls to the above
+  // callbacks to its internal structures.  If such resources were
+  // dynamically allocated, they should only be released by the consumer
+  // after `ArrowArrayProducer::release` has been called and has returned.
+  void* private_data;
+};
+
+// Push-based array producer
+struct ArrowArrayProducer {
+  // Callback to get the produced data type
+  // (will be the same for all pushed arrays).
+  //
+  // The ArrowSchema must be released independently from the ArrowArrayProducer
+  //
+  // XXX add error return?
+  void (*get_schema)(struct ArrowArrayProducer*, struct ArrowSchema* out);
+
+  // Callback to start producing data
+  //
+  // This function should be called once by the consumer.
+  // It tells the producer that the consumer is ready to be called
+  // on the ArrowArrayReceiver callbacks.
+  //
+  // The ArrowArrayReceiver callbacks may be called *before* this function
+  // returns.  Also, each of the receiver callbacks may be called concurrently,
+  // from multiple threads.
+  void (*start_producing)(struct ArrowArrayProducer*, struct ArrowArrayReceiver*);
+
+  // Callback to temporarily pause producing data
+  //
+  // The consumer can use this function to apply backpressure when it is
+  // not ready to receive more data.  However, the producer may still push
+  // data after this function is called (especially if the producer is
+  // multi-threaded, as ensuring serialization may not be convenient).
+  void (*pause_producing)(struct ArrowArrayProducer*);

Review comment:
       IMHO it would complicate the interface if we start introducing semaphore-based coordination (using a shared semaphore pointer?).
   Also, we probably should avoid hardcoding scheduling policies in this API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#discussion_r619881618



##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface
+
+// Consumer-provided callbacks for push producers
+//
+// Rules:
+// - any of these callbacks may be called concurrently from multiple threads
+// - any of these callbacks may call back into
+//   `ArrowArrayProducer::pause_producing`, `ArrowArrayProducer::resume_producing`
+//   or `ArrowArrayProducer::stop_producing`
+//   (but *not* into `ArrowArrayProducer::release`).
+struct ArrowArrayReceiver {
+  // The producer calls this callback to push a data item after it
+  // has filled the pointer-passed ArrowArray struct.
+  //
+  // The consumer *must* move the ArrowArray struct contents before this
+  // callback returns, as the producer is free to release it immediately
+  // afterwards.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_array)(struct ArrowArrayReceiver*, struct ArrowArray* out);
+
+  // The producer calls this callback to signal an error occurred while
+  // producing data.
+  //
+  // `error` is a non-zero `errno`-compatible error code.
+  //
+  // `message` is an optional null-terminated character array describing
+  // the error, or NULL if no description is available.  The `message`
+  // pointer is only valid until this callback returns, therefore the
+  // consumer must copy its contents if it wants to store the error message.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_error)(struct ArrowArrayReceiver*, int error, const char* message);
+
+  // Opaque consumer-specific data.
+  //
+  // This is meant to help the consumer associate calls to the above
+  // callbacks to its internal structures.  If such resources were
+  // dynamically allocated, they should only be released by the consumer
+  // after `ArrowArrayProducer::release` has been called and has returned.
+  void* private_data;
+};
+
+// Push-based array producer
+struct ArrowArrayProducer {
+  // Callback to get the produced data type
+  // (will be the same for all pushed arrays).
+  //
+  // The ArrowSchema must be released independently from the ArrowArrayProducer
+  //
+  // XXX add error return?
+  void (*get_schema)(struct ArrowArrayProducer*, struct ArrowSchema* out);
+
+  // Callback to start producing data
+  //
+  // This function should be called once by the consumer.
+  // It tells the producer that the consumer is ready to be called
+  // on the ArrowArrayReceiver callbacks.
+  //
+  // The ArrowArrayReceiver callbacks may be called *before* this function
+  // returns.  Also, each of the receiver callbacks may be called concurrently,
+  // from multiple threads.
+  void (*start_producing)(struct ArrowArrayProducer*, struct ArrowArrayReceiver*);
+
+  // Callback to temporarily pause producing data
+  //
+  // The consumer can use this function to apply backpressure when it is
+  // not ready to receive more data.  However, the producer may still push
+  // data after this function is called (especially if the producer is
+  // multi-threaded, as ensuring serialization may not be convenient).
+  void (*pause_producing)(struct ArrowArrayProducer*);

Review comment:
       another alternative here could be semaphore based.  Consumers can give an initial count on start_producing.  If consumers need to pause, increase or decrease parallelism they can manipulate the available count. 
   
   I suppose this make deadlock issues more likely?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] hannesmuehleisen commented on a change in pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
hannesmuehleisen commented on a change in pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#discussion_r618999558



##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface
+
+// Consumer-provided callbacks for push producers
+//
+// Rules:
+// - any of these callbacks may be called concurrently from multiple threads
+// - any of these callbacks may call back into
+//   `ArrowArrayProducer::pause_producing`, `ArrowArrayProducer::resume_producing`
+//   or `ArrowArrayProducer::stop_producing`
+//   (but *not* into `ArrowArrayProducer::release`).
+struct ArrowArrayReceiver {
+  // The producer calls this callback to push a data item after it
+  // has filled the pointer-passed ArrowArray struct.
+  //
+  // The consumer *must* move the ArrowArray struct contents before this
+  // callback returns, as the producer is free to release it immediately
+  // afterwards.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_array)(struct ArrowArrayReceiver*, struct ArrowArray* out);
+
+  // The producer calls this callback to signal an error occurred while
+  // producing data.
+  //
+  // `error` is a non-zero `errno`-compatible error code.
+  //
+  // `message` is an optional null-terminated character array describing
+  // the error, or NULL if no description is available.  The `message`
+  // pointer is only valid until this callback returns, therefore the
+  // consumer must copy its contents if it wants to store the error message.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_error)(struct ArrowArrayReceiver*, int error, const char* message);
+
+  // Opaque consumer-specific data.
+  //
+  // This is meant to help the consumer associate calls to the above
+  // callbacks to its internal structures.  If such resources were
+  // dynamically allocated, they should only be released by the consumer
+  // after `ArrowArrayProducer::release` has been called and has returned.
+  void* private_data;
+};
+
+// Push-based array producer
+struct ArrowArrayProducer {
+  // Callback to get the produced data type
+  // (will be the same for all pushed arrays).
+  //
+  // The ArrowSchema must be released independently from the ArrowArrayProducer
+  //
+  // XXX add error return?
+  void (*get_schema)(struct ArrowArrayProducer*, struct ArrowSchema* out);

Review comment:
       We do need a schema to come from somewhere




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#discussion_r619881702



##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface

Review comment:
       might be worth an email to the ML to raise awareness of this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou closed pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #10125:
URL: https://github.com/apache/arrow/pull/10125


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#issuecomment-824224979


   cc @duckdb


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#discussion_r619881137



##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface
+
+// Consumer-provided callbacks for push producers
+//
+// Rules:
+// - any of these callbacks may be called concurrently from multiple threads
+// - any of these callbacks may call back into
+//   `ArrowArrayProducer::pause_producing`, `ArrowArrayProducer::resume_producing`
+//   or `ArrowArrayProducer::stop_producing`
+//   (but *not* into `ArrowArrayProducer::release`).
+struct ArrowArrayReceiver {
+  // The producer calls this callback to push a data item after it
+  // has filled the pointer-passed ArrowArray struct.
+  //
+  // The consumer *must* move the ArrowArray struct contents before this
+  // callback returns, as the producer is free to release it immediately
+  // afterwards.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_array)(struct ArrowArrayReceiver*, struct ArrowArray* out);

Review comment:
       Would it be sufficient for this method to block on calls for back-pressure?  Are there undesirable side effects of doing this?

##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface
+
+// Consumer-provided callbacks for push producers
+//
+// Rules:
+// - any of these callbacks may be called concurrently from multiple threads
+// - any of these callbacks may call back into
+//   `ArrowArrayProducer::pause_producing`, `ArrowArrayProducer::resume_producing`
+//   or `ArrowArrayProducer::stop_producing`
+//   (but *not* into `ArrowArrayProducer::release`).
+struct ArrowArrayReceiver {
+  // The producer calls this callback to push a data item after it
+  // has filled the pointer-passed ArrowArray struct.
+  //
+  // The consumer *must* move the ArrowArray struct contents before this
+  // callback returns, as the producer is free to release it immediately
+  // afterwards.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_array)(struct ArrowArrayReceiver*, struct ArrowArray* out);
+
+  // The producer calls this callback to signal an error occurred while
+  // producing data.
+  //
+  // `error` is a non-zero `errno`-compatible error code.
+  //
+  // `message` is an optional null-terminated character array describing
+  // the error, or NULL if no description is available.  The `message`
+  // pointer is only valid until this callback returns, therefore the
+  // consumer must copy its contents if it wants to store the error message.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_error)(struct ArrowArrayReceiver*, int error, const char* message);
+
+  // Opaque consumer-specific data.
+  //
+  // This is meant to help the consumer associate calls to the above
+  // callbacks to its internal structures.  If such resources were
+  // dynamically allocated, they should only be released by the consumer
+  // after `ArrowArrayProducer::release` has been called and has returned.
+  void* private_data;
+};
+
+// Push-based array producer
+struct ArrowArrayProducer {
+  // Callback to get the produced data type
+  // (will be the same for all pushed arrays).
+  //
+  // The ArrowSchema must be released independently from the ArrowArrayProducer
+  //
+  // XXX add error return?
+  void (*get_schema)(struct ArrowArrayProducer*, struct ArrowSchema* out);
+
+  // Callback to start producing data
+  //
+  // This function should be called once by the consumer.
+  // It tells the producer that the consumer is ready to be called
+  // on the ArrowArrayReceiver callbacks.
+  //
+  // The ArrowArrayReceiver callbacks may be called *before* this function
+  // returns.  Also, each of the receiver callbacks may be called concurrently,
+  // from multiple threads.
+  void (*start_producing)(struct ArrowArrayProducer*, struct ArrowArrayReceiver*);
+
+  // Callback to temporarily pause producing data
+  //
+  // The consumer can use this function to apply backpressure when it is
+  // not ready to receive more data.  However, the producer may still push
+  // data after this function is called (especially if the producer is
+  // multi-threaded, as ensuring serialization may not be convenient).
+  void (*pause_producing)(struct ArrowArrayProducer*);

Review comment:
       another alternative here could be semaphore based.  Consumers can give an initial count on start_producing.  If consumers need to pause, increase or decrease parallelism they can manipulate the available count. 
   
   I suppose this make deadlock issues more likely?

##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface

Review comment:
       might be worth an email to the ML to raise awareness of this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#issuecomment-824224677


   This proposed interface is complex by necessity. Given the unexpected difficulty of implementing the simpler parts of the C data interface for other projects, I don't know if that is a desirable extension.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#issuecomment-824908669


   @hannesmuehleisen Not sure you saw this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] hannesmuehleisen commented on pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
hannesmuehleisen commented on pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#issuecomment-825083138


   CC @pdet Yes I saw :D


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#issuecomment-828309493


   Open question: can there be multiple outputs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#issuecomment-827774106


   @duckdb Your impressions would be very welcome here :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#issuecomment-827824767


   I've added a callback to allow communicating EOF (together with sequence numbers to help with multi-threaded scenarios).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10125: [C++] WIP: C push producer interface

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10125:
URL: https://github.com/apache/arrow/pull/10125#discussion_r621425274



##########
File path: cpp/src/arrow/c/abi.h
##########
@@ -98,6 +98,116 @@ struct ArrowArrayStream {
   void* private_data;
 };
 
+// EXPERIMENTAL: C push producer interface
+
+// Consumer-provided callbacks for push producers
+//
+// Rules:
+// - any of these callbacks may be called concurrently from multiple threads
+// - any of these callbacks may call back into
+//   `ArrowArrayProducer::pause_producing`, `ArrowArrayProducer::resume_producing`
+//   or `ArrowArrayProducer::stop_producing`
+//   (but *not* into `ArrowArrayProducer::release`).
+struct ArrowArrayReceiver {
+  // The producer calls this callback to push a data item after it
+  // has filled the pointer-passed ArrowArray struct.
+  //
+  // The consumer *must* move the ArrowArray struct contents before this
+  // callback returns, as the producer is free to release it immediately
+  // afterwards.
+  //
+  // This callback is allowed to call back any ArrowArrayProducer callback,
+  // except the `release` callback.
+  void (*receive_array)(struct ArrowArrayReceiver*, struct ArrowArray* out);

Review comment:
       As noted, these callbacks can be called from multiple threads, so I don't think blocking would solve the issue satisfiably.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org