You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "lidavidm (via GitHub)" <gi...@apache.org> on 2023/06/21 16:32:03 UTC

[GitHub] [arrow] lidavidm opened a new pull request, #36205: GH-16604: [C++][FlightRPC] Add async Flight client

lidavidm opened a new pull request, #36205:
URL: https://github.com/apache/arrow/pull/36205

   <!--
   Thanks for opening a pull request!
   If this is your first pull request you can find detailed information on how 
   to contribute here:
     * [New Contributor's Guide](https://arrow.apache.org/docs/dev/developers/guide/step_by_step/pr_lifecycle.html#reviews-and-merge-of-the-pull-request)
     * [Contributing Overview](https://arrow.apache.org/docs/dev/developers/overview.html)
   
   
   If this is not a [minor PR](https://github.com/apache/arrow/blob/main/CONTRIBUTING.md#Minor-Fixes). Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose
   
   Opening GitHub issues ahead of time contributes to the [Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.) of the Apache Arrow project.
   
   Then could you also rename the pull request title in the following format?
   
       GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   or
   
       MINOR: [${COMPONENT}] ${SUMMARY}
   
   In the case of PARQUET issues on JIRA the title also supports:
   
       PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   -->
   
   ### Rationale for this change
   
   Draft an async client API for Flight in C++.
   
   ### What changes are included in this PR?
   
   An implementation of DoAction, GetFlightInfo, and DoGet.
   
   The next challenge will be to figure out how to handle writes.
   
   ### Are these changes tested?
   
   Sort of.
   
   ### Are there any user-facing changes?
   
   Adds new APIs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242202689


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   > For queue depth: on each OnWritten callback, the application would check the queue depth and enqueue messages until there's enough work in the buffer.
   
   1) doesn't the transport have better knowledge to decide whether there is "enough work in the buffer"?
   
   2) if the application decides to postpone the next write, to which deadline does it postpone it? does it fire a timer to check the queue depth, say, 1ms later?
   
   The point of having something like `Drain` or `ResumeProducing` is that the _transport_ informs the application that there is now enough room in the buffer.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238769453


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;

Review Comment:
   Also, shouldn't this be able to return a `Status`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242185430


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   If all transports give you the queue depth, then why not?
   But the open question is how the application would know what to do with the queue depth. That information (the queue depth) is not easy to interpret.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238792785


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   We could do that (the underlying API does not, but that's OK), but then we probably need to consider backpressure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238789034


##########
TODOs.txt:
##########
@@ -0,0 +1,17 @@
+- Use a uniform API for all async Flight client APIs:
+
+  void Foo(const FlightCallOptions&, shared_ptr<ReadListener<T>> listener);
+
+  - shared_ptr, or raw pointer? (gRPC uses the latter...)
+  - How does this look for writes?
+  - Maybe we always call AddHold and RemoveHold? (It's unclear to me
+    why you'd want to read/write after the stream ends...)
+
+- StopToken needs to be able to run a callback, so that we can cancel gRPC calls.

Review Comment:
   Yes, I think there's an issue already open for this? It should be doable now that we use a helper thread for signal-based cancellation.
   
   The main question is the API: do we want to ensure that callbacks are automatically removed at the end of a scope? Something else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242174083


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   Note that we can limit the gRPC transport to one write at a time anyway. But it sounds desirable to allow for more general flow-control if we intend to expose better transports (such as UCX?).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238782508


##########
cpp/src/arrow/flight/types.cc:
##########
@@ -771,5 +774,258 @@ arrow::Result<std::string> BasicAuth::SerializeToString() const {
 Status BasicAuth::Serialize(const BasicAuth& basic_auth, std::string* out) {
   return basic_auth.SerializeToString().Value(out);
 }
+
+//------------------------------------------------------------
+// Error propagation helpers
+
+std::string ToString(TransportStatusCode code) {
+  switch (code) {
+    case TransportStatusCode::kOk:
+      return "kOk";
+    case TransportStatusCode::kUnknown:
+      return "kUnknown";
+    case TransportStatusCode::kInternal:
+      return "kInternal";
+    case TransportStatusCode::kInvalidArgument:
+      return "kInvalidArgument";
+    case TransportStatusCode::kTimedOut:
+      return "kTimedOut";
+    case TransportStatusCode::kNotFound:
+      return "kNotFound";
+    case TransportStatusCode::kAlreadyExists:
+      return "kAlreadyExists";
+    case TransportStatusCode::kCancelled:
+      return "kCancelled";
+    case TransportStatusCode::kUnauthenticated:
+      return "kUnauthenticated";
+    case TransportStatusCode::kUnauthorized:
+      return "kUnauthorized";
+    case TransportStatusCode::kUnimplemented:
+      return "kUnimplemented";
+    case TransportStatusCode::kUnavailable:
+      return "kUnavailable";
+  }
+  return "(unknown code)";
+}
+
+TransportStatus::Impl::Impl(TransportStatusCode code, std::string message,
+                            std::vector<std::unique_ptr<StatusDetail>> details)
+    : code(code), message(std::move(message)), details(std::move(details)) {}
+TransportStatus::TransportStatus() = default;
+TransportStatus::TransportStatus(TransportStatusCode code)
+    : impl_(code == TransportStatusCode::kOk
+                ? nullptr
+                : std::make_unique<Impl>(code, "",
+                                         std::vector<std::unique_ptr<StatusDetail>>{})) {}
+TransportStatus::TransportStatus(TransportStatusCode code, std::string message)
+    : TransportStatus(code, std::move(message), {}) {}
+TransportStatus::TransportStatus(TransportStatusCode code, std::string message,
+                                 std::vector<std::unique_ptr<StatusDetail>> details)
+    : impl_(std::make_unique<Impl>(code, std::move(message), std::move(details))) {
+  if (code == TransportStatusCode::kOk) {
+    DCHECK(message.empty() && details.empty())
+        << "constructed kOk status with non-empty message/details";
+  }
+}
+
+std::string TransportStatus::ToString() const {
+  if (!impl_) {
+    return "TransportStatus{kOk}";
+  }
+  std::string str = "TransportStatus{";
+  str += arrow::flight::ToString(code());
+  str += ", message='";
+  str += message();
+  str += "', details={";
+  bool first = true;
+  for (const auto& detail : details()) {
+    if (!first) {
+      str += ", ";
+    }
+    first = false;
+
+    str += detail->ToString();
+  }
+  str += "}}";
+  return str;
+}
+
+TransportStatus TransportStatus::FromStatus(const Status& arrow_status) {
+  if (arrow_status.ok()) {
+    return TransportStatus{TransportStatusCode::kOk, ""};
+  }
+
+  TransportStatusCode code = TransportStatusCode::kUnknown;
+  std::string message = arrow_status.message();
+  if (arrow_status.detail()) {
+    message += ". Detail: ";
+    message += arrow_status.detail()->ToString();
+  }
+
+  std::shared_ptr<FlightStatusDetail> flight_status =
+      FlightStatusDetail::UnwrapStatus(arrow_status);
+  if (flight_status) {
+    switch (flight_status->code()) {
+      case FlightStatusCode::Internal:
+        code = TransportStatusCode::kInternal;
+        break;
+      case FlightStatusCode::TimedOut:
+        code = TransportStatusCode::kTimedOut;
+        break;
+      case FlightStatusCode::Cancelled:
+        code = TransportStatusCode::kCancelled;
+        break;
+      case FlightStatusCode::Unauthenticated:
+        code = TransportStatusCode::kUnauthenticated;
+        break;
+      case FlightStatusCode::Unauthorized:
+        code = TransportStatusCode::kUnauthorized;
+        break;
+      case FlightStatusCode::Unavailable:
+        code = TransportStatusCode::kUnavailable;
+        break;
+      default:
+        break;
+    }
+  } else if (arrow_status.IsKeyError()) {
+    code = TransportStatusCode::kNotFound;
+  } else if (arrow_status.IsInvalid()) {
+    code = TransportStatusCode::kInvalidArgument;
+  } else if (arrow_status.IsCancelled()) {
+    code = TransportStatusCode::kCancelled;
+  } else if (arrow_status.IsNotImplemented()) {
+    code = TransportStatusCode::kUnimplemented;
+  } else if (arrow_status.IsAlreadyExists()) {
+    code = TransportStatusCode::kAlreadyExists;
+  }
+  return TransportStatus{code, std::move(message)};
+}
+
+TransportStatus TransportStatus::FromCodeStringAndMessage(const std::string& code_str,
+                                                          std::string message) {
+  int code_int = 0;
+  try {
+    code_int = std::stoi(code_str);
+  } catch (...) {
+    return TransportStatus{
+        TransportStatusCode::kUnknown,
+        message + ". Also, server sent unknown or invalid Arrow status code " + code_str};

Review Comment:
   You mean "Flight status code"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238794301


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;

Review Comment:
   I would rather we be explicit; for instance, Arrow C++'s logging doesn't integrate into any application logging framework, which is annoying when you get unexpected, uncontrollable console spam.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238800571


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   Yes, but what I mean is that now we have to propagate it to the client somehow. This is difficult to deal with. (gRPC/Java and Flight/Java have this problem, where you can buffer all the writes you want, and either have to manually poll and block on a flag to respect backpressure, or deal with a fairly complicated callback that fires in weird situations. Part of the difficulty there is just how gRPC's API there was designed, though.)
   
   I will update this to allow buffering, and expose a getter that lets you estimate queue depth in terms of messages/bytes (so an application can buffer up to some amount, then wait for a callback to buffer more). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238774529


##########
cpp/src/arrow/flight/types.h:
##########
@@ -716,5 +720,130 @@ class ARROW_FLIGHT_EXPORT SimpleResultStream : public ResultStream {
   size_t position_;
 };
 
+/// \defgroup flight-error Error Handling
+/// Types for handling errors from RPCs.  Flight uses a set of status
+/// codes standardized across Flight implementations, so these types
+/// let applications work directly with those codes instead of having
+/// to translate to and from Arrow Status.
+
+/// \brief Abstract status code for an RPC as per the Flight
+///   specification.
+enum class TransportStatusCode {
+  // TODO: document meanings
+  kOk = 0,
+  kUnknown = 1,
+  kInternal = 2,
+  kInvalidArgument = 3,
+  kTimedOut = 4,
+  kNotFound = 5,
+  kAlreadyExists = 6,
+  kCancelled = 7,
+  kUnauthenticated = 8,
+  kUnauthorized = 9,
+  kUnimplemented = 10,
+  kUnavailable = 11,
+};
+
+std::string ToString(TransportStatusCode code);
+
+/// \brief An error from an RPC call, using Flight error codes
+///   directly instead of trying to translate to Arrow Status.
+///
+/// Client-side errors (i.e., internal to the client-side Flight
+/// implementation) may use the code kInternal or kUnimplemented, or
+/// may be attached in details.
+class ARROW_FLIGHT_EXPORT TransportStatus : public util::ToStringOstreamable<Status> {
+ public:
+  /// \brief Construct an empty OK status.
+  TransportStatus();
+  explicit TransportStatus(TransportStatusCode code);
+  explicit TransportStatus(TransportStatusCode code, std::string message);
+  explicit TransportStatus(TransportStatusCode code, std::string message,
+                           std::vector<std::unique_ptr<StatusDetail>> details);
+
+  /// \brief Is this status an error or not.
+  bool ok() const { return impl_ == NULLPTR; }
+
+  /// \brief Get the code.
+  TransportStatusCode code() const {
+    return impl_ ? impl_->code : TransportStatusCode::kOk;
+  }
+
+  /// \brief Get the message.
+  const std::string& message() const {
+    static std::string kEmpty;
+    return impl_ ? impl_->message : kEmpty;
+  }
+
+  /// \brief Move the message (mutates the status).
+  std::string MoveMessage() && { return impl_ ? std::move(impl_->message) : ""; }
+
+  /// \brief Get the details.
+  const std::vector<std::unique_ptr<StatusDetail>>& details() const {
+    static std::vector<std::unique_ptr<StatusDetail>> kEmpty;
+    return impl_ ? impl_->details : kEmpty;
+  }
+
+  /// \brief Stringify the status.
+  std::string ToString() const;
+
+  /// \brief Convert an abstract transport status to a C++ status.
+  Status ToStatus() const;
+
+  /// \brief Convert a C++ status to an abstract transport status.
+  static TransportStatus FromStatus(const Status& arrow_status);
+
+  /// \brief Reconstruct a string-encoded TransportStatus.
+  static TransportStatus FromCodeStringAndMessage(const std::string& code_str,
+                                                  std::string message);
+
+ private:
+  struct Impl {
+    explicit Impl(TransportStatusCode code, std::string message,
+                  std::vector<std::unique_ptr<StatusDetail>> details);
+    TransportStatusCode code;
+    std::string message;
+    std::vector<std::unique_ptr<StatusDetail>> details;
+  };
+  std::unique_ptr<Impl> impl_;
+};
+
+/// \brief Additional server-provided binary error metadata.  This is
+///   meant for servers to provide rich error information using a format
+///   like Protocol Buffers for sophisticated API clients.
+class BinaryStatusDetail : public StatusDetail {
+ public:
+  constexpr static const char* kTypeId = "flight::BinaryStatusDetail";
+  explicit BinaryStatusDetail(std::vector<std::byte> details)
+      : details_(std::move(details)) {}
+  const char* type_id() const override { return kTypeId; }
+  std::string ToString() const override;
+
+  const std::vector<std::byte> details() const { return details_; }

Review Comment:
   I can switch (at some point I had `variant<string, vector<byte>>` to differentiate between binary/textual data but then that got split into the explicit StatusDetail classes here)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238762803


##########
TODOs.txt:
##########
@@ -0,0 +1,17 @@
+- Use a uniform API for all async Flight client APIs:
+
+  void Foo(const FlightCallOptions&, shared_ptr<ReadListener<T>> listener);
+
+  - shared_ptr, or raw pointer? (gRPC uses the latter...)

Review Comment:
   Or weak_ptr? This way the caller can easily decide the lifetime (like a raw pointer), while not risking crashes (unlike a raw pointer).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #36205:
URL: https://github.com/apache/arrow/pull/36205#issuecomment-1601164352

   :warning: GitHub issue #16604 **has been automatically assigned in GitHub** to PR creator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238793646


##########
TODOs.txt:
##########
@@ -0,0 +1,17 @@
+- Use a uniform API for all async Flight client APIs:
+
+  void Foo(const FlightCallOptions&, shared_ptr<ReadListener<T>> listener);
+
+  - shared_ptr, or raw pointer? (gRPC uses the latter...)
+  - How does this look for writes?
+  - Maybe we always call AddHold and RemoveHold? (It's unclear to me
+    why you'd want to read/write after the stream ends...)
+
+- StopToken needs to be able to run a callback, so that we can cancel gRPC calls.

Review Comment:
   The code adding a callback should certainly be responsible for removing it. I think my question was: do we want to provide ways to make that it easier (a RAII guard type perhaps?).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242137597


##########
cpp/src/arrow/flight/types.h:
##########
@@ -716,5 +720,130 @@ class ARROW_FLIGHT_EXPORT SimpleResultStream : public ResultStream {
   size_t position_;
 };
 
+/// \defgroup flight-error Error Handling
+/// Types for handling errors from RPCs.  Flight uses a set of status
+/// codes standardized across Flight implementations, so these types
+/// let applications work directly with those codes instead of having
+/// to translate to and from Arrow Status.
+
+/// \brief Abstract status code for an RPC as per the Flight
+///   specification.
+enum class TransportStatusCode {
+  // TODO: document meanings
+  kOk = 0,
+  kUnknown = 1,
+  kInternal = 2,
+  kInvalidArgument = 3,
+  kTimedOut = 4,
+  kNotFound = 5,
+  kAlreadyExists = 6,
+  kCancelled = 7,
+  kUnauthenticated = 8,
+  kUnauthorized = 9,
+  kUnimplemented = 10,
+  kUnavailable = 11,
+};
+
+std::string ToString(TransportStatusCode code);
+
+/// \brief An error from an RPC call, using Flight error codes
+///   directly instead of trying to translate to Arrow Status.
+///
+/// Client-side errors (i.e., internal to the client-side Flight
+/// implementation) may use the code kInternal or kUnimplemented, or
+/// may be attached in details.
+class ARROW_FLIGHT_EXPORT TransportStatus : public util::ToStringOstreamable<Status> {
+ public:
+  /// \brief Construct an empty OK status.
+  TransportStatus();
+  explicit TransportStatus(TransportStatusCode code);
+  explicit TransportStatus(TransportStatusCode code, std::string message);
+  explicit TransportStatus(TransportStatusCode code, std::string message,
+                           std::vector<std::unique_ptr<StatusDetail>> details);
+
+  /// \brief Is this status an error or not.
+  bool ok() const { return impl_ == NULLPTR; }
+
+  /// \brief Get the code.
+  TransportStatusCode code() const {
+    return impl_ ? impl_->code : TransportStatusCode::kOk;
+  }
+
+  /// \brief Get the message.
+  const std::string& message() const {
+    static std::string kEmpty;
+    return impl_ ? impl_->message : kEmpty;
+  }
+
+  /// \brief Move the message (mutates the status).
+  std::string MoveMessage() && { return impl_ ? std::move(impl_->message) : ""; }
+
+  /// \brief Get the details.
+  const std::vector<std::unique_ptr<StatusDetail>>& details() const {
+    static std::vector<std::unique_ptr<StatusDetail>> kEmpty;
+    return impl_ ? impl_->details : kEmpty;
+  }
+
+  /// \brief Stringify the status.
+  std::string ToString() const;
+
+  /// \brief Convert an abstract transport status to a C++ status.
+  Status ToStatus() const;
+
+  /// \brief Convert a C++ status to an abstract transport status.
+  static TransportStatus FromStatus(const Status& arrow_status);
+
+  /// \brief Reconstruct a string-encoded TransportStatus.
+  static TransportStatus FromCodeStringAndMessage(const std::string& code_str,
+                                                  std::string message);
+
+ private:
+  struct Impl {
+    explicit Impl(TransportStatusCode code, std::string message,
+                  std::vector<std::unique_ptr<StatusDetail>> details);
+    TransportStatusCode code;
+    std::string message;
+    std::vector<std::unique_ptr<StatusDetail>> details;
+  };
+  std::unique_ptr<Impl> impl_;
+};
+
+/// \brief Additional server-provided binary error metadata.  This is
+///   meant for servers to provide rich error information using a format
+///   like Protocol Buffers for sophisticated API clients.
+class BinaryStatusDetail : public StatusDetail {
+ public:
+  constexpr static const char* kTypeId = "flight::BinaryStatusDetail";
+  explicit BinaryStatusDetail(std::vector<std::byte> details)
+      : details_(std::move(details)) {}
+  const char* type_id() const override { return kTypeId; }
+  std::string ToString() const override;
+
+  const std::vector<std::byte> details() const { return details_; }

Review Comment:
   We're already using `std::string` for both string and binary data accross the codebase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238789291


##########
cpp/src/arrow/flight/types.h:
##########
@@ -716,5 +720,130 @@ class ARROW_FLIGHT_EXPORT SimpleResultStream : public ResultStream {
   size_t position_;
 };
 
+/// \defgroup flight-error Error Handling
+/// Types for handling errors from RPCs.  Flight uses a set of status
+/// codes standardized across Flight implementations, so these types
+/// let applications work directly with those codes instead of having
+/// to translate to and from Arrow Status.
+
+/// \brief Abstract status code for an RPC as per the Flight
+///   specification.

Review Comment:
   FlightStatusCode was a kludge in the original implementation to add to the arrow::StatusCode list without modifying StatusCode. This was refactored out of transport.h and intends to be definitive. (Both gRPC and UCX backends already translate to it, then the synchronous API translates that into arrow::Status + FlightStatusCode). I would rather make a clean break than try to repurpose or extend FlightStatusCode (which also contains statuses that do not actually exist in Flight on the wire.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238785688


##########
cpp/src/arrow/flight/types.h:
##########
@@ -716,5 +720,130 @@ class ARROW_FLIGHT_EXPORT SimpleResultStream : public ResultStream {
   size_t position_;
 };
 
+/// \defgroup flight-error Error Handling
+/// Types for handling errors from RPCs.  Flight uses a set of status
+/// codes standardized across Flight implementations, so these types
+/// let applications work directly with those codes instead of having
+/// to translate to and from Arrow Status.
+
+/// \brief Abstract status code for an RPC as per the Flight
+///   specification.

Review Comment:
   How come this is different from FlightStatusCode? Do we want to deprecate the latter?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238900376


##########
TODOs.txt:
##########
@@ -0,0 +1,17 @@
+- Use a uniform API for all async Flight client APIs:
+
+  void Foo(const FlightCallOptions&, shared_ptr<ReadListener<T>> listener);
+
+  - shared_ptr, or raw pointer? (gRPC uses the latter...)

Review Comment:
   We could use non-const reference to express this more strongly? weak_ptr seems rather awkward to use here unless you already have a shared_ptr (in which case, you'd probably rather just pass shared_ptr), but I dislike forcing heap allocation here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #36205:
URL: https://github.com/apache/arrow/pull/36205#issuecomment-1611980986

   It turns out queueing writes is quite complicated since gRPC apparently does not wait for your callback to finish before invoking it again, so we are going to have to take some locks...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #36205:
URL: https://github.com/apache/arrow/pull/36205#issuecomment-1624325937

   Split out here: https://github.com/apache/arrow/pull/36517


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on PR #36205:
URL: https://github.com/apache/arrow/pull/36205#issuecomment-1621713542

   @lidavidm Sounds good to me!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #36205:
URL: https://github.com/apache/arrow/pull/36205#issuecomment-1620583925

   Rebased/squashed and converted GetFlightInfo to take shared_ptr. If that looks roughly reasonable I'll split out all other changes and add tests for async GetFlightInfo.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1243900914


##########
TODOs.txt:
##########
@@ -0,0 +1,17 @@
+- Use a uniform API for all async Flight client APIs:
+
+  void Foo(const FlightCallOptions&, shared_ptr<ReadListener<T>> listener);
+
+  - shared_ptr, or raw pointer? (gRPC uses the latter...)

Review Comment:
   I'll draft out the shared_ptr locally (I suppose you can always wrap a reference to a stack allocated object if the allocation matters)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1241957024


##########
TODOs.txt:
##########
@@ -0,0 +1,17 @@
+- Use a uniform API for all async Flight client APIs:
+
+  void Foo(const FlightCallOptions&, shared_ptr<ReadListener<T>> listener);
+
+  - shared_ptr, or raw pointer? (gRPC uses the latter...)

Review Comment:
   The main challenge IMHO is to ensure proper lifetime management. If you don't pass a managed pointer (either shared or weak), the async machinery might invoke a listener that doesn't exist anymore.
   
   By the way, some cancellation mechanism might also be desired?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242181463


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   I am going to implement queueing, at least.
   
   I think Start/StopProducing could be implemented on top of just knowing the queue depth.
   
   gRPC already has backpressure, anyways: it's "one write" at a time but the write is considered complete once it enters an internal buffer, so effectively gRPC is giving you backpressure by delaying the OnWrite callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242270436


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   > the application would just do the check the next time OnWritten is called (it's called for _each_ message sent). So you can think of OnWritten as ResumeProducing, if you want.
   
   Ah! Well, fair enough, then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238768981


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;

Review Comment:
   If the templating is annoying, perhaps this could be `void OnNext(std::any)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238794991


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   Indeed... but backpressure is precisely the kind of concern that the transport is better equipped to deal with, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238802080


##########
TODOs.txt:
##########
@@ -0,0 +1,17 @@
+- Use a uniform API for all async Flight client APIs:
+
+  void Foo(const FlightCallOptions&, shared_ptr<ReadListener<T>> listener);
+
+  - shared_ptr, or raw pointer? (gRPC uses the latter...)
+  - How does this look for writes?
+  - Maybe we always call AddHold and RemoveHold? (It's unclear to me
+    why you'd want to read/write after the stream ends...)
+
+- StopToken needs to be able to run a callback, so that we can cancel gRPC calls.

Review Comment:
   Ah. 
   
   So frankly, I don't intend to tackle it here, so I'd rather wait and see how things look once it's wired up. (My thought, though, was that if you provide a StopToken in the CallOptions - which is already possible - we should attach and detach the callback at appropriate times.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242163523


##########
cpp/src/arrow/flight/transport.h:
##########
@@ -223,44 +236,30 @@ ARROW_FLIGHT_EXPORT
 TransportRegistry* GetDefaultTransportRegistry();
 
 //------------------------------------------------------------
-// Error propagation helpers
-
-/// \brief Abstract status code as per the Flight specification.
-enum class TransportStatusCode {
-  kOk = 0,
-  kUnknown = 1,
-  kInternal = 2,
-  kInvalidArgument = 3,
-  kTimedOut = 4,
-  kNotFound = 5,
-  kAlreadyExists = 6,
-  kCancelled = 7,
-  kUnauthenticated = 8,
-  kUnauthorized = 9,
-  kUnimplemented = 10,
-  kUnavailable = 11,
-};
+// Async APIs
 
-/// \brief Abstract error status.
+/// \brief Transport-specific state for an async RPC.
 ///
-/// Transport implementations may use side channels (e.g. HTTP
-/// trailers) to convey additional information to reconstruct the
-/// original C++ status for implementations that can use it.
-struct ARROW_FLIGHT_EXPORT TransportStatus {
-  TransportStatusCode code;
-  std::string message;
-
-  /// \brief Convert a C++ status to an abstract transport status.
-  static TransportStatus FromStatus(const Status& arrow_status);
-
-  /// \brief Reconstruct a string-encoded TransportStatus.
-  static TransportStatus FromCodeStringAndMessage(const std::string& code_str,
-                                                  std::string message);
+/// Transport implementations may subclass this to store their own
+/// state, and stash an instance in a user-supplied AsyncListener via
+/// ClientTransport::GetAsyncRpc and ClientTransport::SetAsyncRpc.
+class AsyncRpc {
+ public:
+  virtual ~AsyncRpc() = default;
+  /// \brief Request cancellation of the RPC.
+  virtual void TryCancel() {}
 
-  /// \brief Convert an abstract transport status to a C++ status.
-  Status ToStatus() const;
+  /// Only needed for DoPut/DoExchange
+  virtual void Begin(std::shared_ptr<Schema> schema, ipc::IpcWriteOptions options) {}
+  /// Only needed for DoPut/DoExchange
+  virtual void Write(arrow::flight::FlightStreamChunk chunk, bool last) {}
+  /// Only needed for DoPut/DoExchange
+  virtual void DoneWriting() {}

Review Comment:
   Is there a difference between calling `DoneWriting` and calling `Write(..., last=true)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242208705


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   1: well, yes, that's why gRPC doesn't call the OnWritten callback until it's ready for more work!
   
   2: the application would just do the check the next time OnWritten is called (it's called for *each* message sent). So you can think of OnWritten as ResumeProducing, if you want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242176908


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {

Review Comment:
   DoPut returns metadata from the server (the intent was to use this to implement resumable uploads)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1243867859


##########
TODOs.txt:
##########
@@ -0,0 +1,17 @@
+- Use a uniform API for all async Flight client APIs:
+
+  void Foo(const FlightCallOptions&, shared_ptr<ReadListener<T>> listener);
+
+  - shared_ptr, or raw pointer? (gRPC uses the latter...)

Review Comment:
   Hmm, the problem is then we have a circular reference (and weak_ptr just punts the problem). The caller has state (the callbacks), and the transport has state (the actual RPC), and they both need to reference each other (the callbacks need to update the transport, and the transport needs to invoke the callbacks). 
   
   I suppose we could very carefully drop the weak/shared_ptr holding the callbacks from the RPC state when the RPC finishes, breaking the loop...? But I would rather not create the situation in the first place.
   
   For what it's worth, the gRPC API expects the caller to hold the RPC state alive (that's the grpc::ClientContext).
   
   I suppose a different solution would be to force the transport to hold all the state (e.g. by stashing it in the client object). This makes the client worse for concurrent RPCs (now they're all blocking on that central state, even if only for a short time).
   
   Another might be to change the return type from `void` to some object containing the RPC state. The caller would then hold this alive (and internally it would own the callbacks). You would still have to heap-allocate the callback structure.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242133797


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   That API is modeled after [Twisted's](https://docs.twisted.org/en/stable/core/howto/producers.html#push-producers) flow-control mechanism.
   
   Another possibility is to expose a [asyncio-like](https://docs.python.org/3.13/library/asyncio-stream.html#asyncio.StreamWriter.drain) API, for example a `Future<> Drain()` method on the transport.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242135455


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {

Review Comment:
   I'm curious: why would `DoPut` listen for incoming buffers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238769952


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.

Review Comment:
   This is an async-write, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238773857


##########
cpp/src/arrow/flight/types.h:
##########
@@ -716,5 +720,130 @@ class ARROW_FLIGHT_EXPORT SimpleResultStream : public ResultStream {
   size_t position_;
 };
 
+/// \defgroup flight-error Error Handling
+/// Types for handling errors from RPCs.  Flight uses a set of status
+/// codes standardized across Flight implementations, so these types
+/// let applications work directly with those codes instead of having
+/// to translate to and from Arrow Status.
+
+/// \brief Abstract status code for an RPC as per the Flight
+///   specification.
+enum class TransportStatusCode {
+  // TODO: document meanings
+  kOk = 0,
+  kUnknown = 1,
+  kInternal = 2,
+  kInvalidArgument = 3,
+  kTimedOut = 4,
+  kNotFound = 5,
+  kAlreadyExists = 6,
+  kCancelled = 7,
+  kUnauthenticated = 8,
+  kUnauthorized = 9,
+  kUnimplemented = 10,
+  kUnavailable = 11,
+};
+
+std::string ToString(TransportStatusCode code);
+
+/// \brief An error from an RPC call, using Flight error codes
+///   directly instead of trying to translate to Arrow Status.
+///
+/// Client-side errors (i.e., internal to the client-side Flight
+/// implementation) may use the code kInternal or kUnimplemented, or
+/// may be attached in details.
+class ARROW_FLIGHT_EXPORT TransportStatus : public util::ToStringOstreamable<Status> {
+ public:
+  /// \brief Construct an empty OK status.
+  TransportStatus();
+  explicit TransportStatus(TransportStatusCode code);
+  explicit TransportStatus(TransportStatusCode code, std::string message);
+  explicit TransportStatus(TransportStatusCode code, std::string message,
+                           std::vector<std::unique_ptr<StatusDetail>> details);
+
+  /// \brief Is this status an error or not.
+  bool ok() const { return impl_ == NULLPTR; }
+
+  /// \brief Get the code.
+  TransportStatusCode code() const {
+    return impl_ ? impl_->code : TransportStatusCode::kOk;
+  }
+
+  /// \brief Get the message.
+  const std::string& message() const {
+    static std::string kEmpty;
+    return impl_ ? impl_->message : kEmpty;
+  }
+
+  /// \brief Move the message (mutates the status).
+  std::string MoveMessage() && { return impl_ ? std::move(impl_->message) : ""; }
+
+  /// \brief Get the details.
+  const std::vector<std::unique_ptr<StatusDetail>>& details() const {
+    static std::vector<std::unique_ptr<StatusDetail>> kEmpty;
+    return impl_ ? impl_->details : kEmpty;
+  }
+
+  /// \brief Stringify the status.
+  std::string ToString() const;
+
+  /// \brief Convert an abstract transport status to a C++ status.
+  Status ToStatus() const;
+
+  /// \brief Convert a C++ status to an abstract transport status.
+  static TransportStatus FromStatus(const Status& arrow_status);
+
+  /// \brief Reconstruct a string-encoded TransportStatus.
+  static TransportStatus FromCodeStringAndMessage(const std::string& code_str,
+                                                  std::string message);
+
+ private:
+  struct Impl {
+    explicit Impl(TransportStatusCode code, std::string message,
+                  std::vector<std::unique_ptr<StatusDetail>> details);
+    TransportStatusCode code;
+    std::string message;
+    std::vector<std::unique_ptr<StatusDetail>> details;
+  };
+  std::unique_ptr<Impl> impl_;
+};
+
+/// \brief Additional server-provided binary error metadata.  This is
+///   meant for servers to provide rich error information using a format
+///   like Protocol Buffers for sophisticated API clients.
+class BinaryStatusDetail : public StatusDetail {
+ public:
+  constexpr static const char* kTypeId = "flight::BinaryStatusDetail";
+  explicit BinaryStatusDetail(std::vector<std::byte> details)
+      : details_(std::move(details)) {}
+  const char* type_id() const override { return kTypeId; }
+  std::string ToString() const override;
+
+  const std::vector<std::byte> details() const { return details_; }

Review Comment:
   It seems a bit gratuitous to introduce `std::byte` in our codebase. Why not use `std::string` simply?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242185430


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   If all transports give you the queue depth, then why not?
   But the open question is how the application would know what to do with the queue depth. It's not easy to interpret.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242178102


##########
cpp/src/arrow/flight/transport.h:
##########
@@ -223,44 +236,30 @@ ARROW_FLIGHT_EXPORT
 TransportRegistry* GetDefaultTransportRegistry();
 
 //------------------------------------------------------------
-// Error propagation helpers
-
-/// \brief Abstract status code as per the Flight specification.
-enum class TransportStatusCode {
-  kOk = 0,
-  kUnknown = 1,
-  kInternal = 2,
-  kInvalidArgument = 3,
-  kTimedOut = 4,
-  kNotFound = 5,
-  kAlreadyExists = 6,
-  kCancelled = 7,
-  kUnauthenticated = 8,
-  kUnauthorized = 9,
-  kUnimplemented = 10,
-  kUnavailable = 11,
-};
+// Async APIs
 
-/// \brief Abstract error status.
+/// \brief Transport-specific state for an async RPC.
 ///
-/// Transport implementations may use side channels (e.g. HTTP
-/// trailers) to convey additional information to reconstruct the
-/// original C++ status for implementations that can use it.
-struct ARROW_FLIGHT_EXPORT TransportStatus {
-  TransportStatusCode code;
-  std::string message;
-
-  /// \brief Convert a C++ status to an abstract transport status.
-  static TransportStatus FromStatus(const Status& arrow_status);
-
-  /// \brief Reconstruct a string-encoded TransportStatus.
-  static TransportStatus FromCodeStringAndMessage(const std::string& code_str,
-                                                  std::string message);
+/// Transport implementations may subclass this to store their own
+/// state, and stash an instance in a user-supplied AsyncListener via
+/// ClientTransport::GetAsyncRpc and ClientTransport::SetAsyncRpc.
+class AsyncRpc {
+ public:
+  virtual ~AsyncRpc() = default;
+  /// \brief Request cancellation of the RPC.
+  virtual void TryCancel() {}
 
-  /// \brief Convert an abstract transport status to a C++ status.
-  Status ToStatus() const;
+  /// Only needed for DoPut/DoExchange
+  virtual void Begin(std::shared_ptr<Schema> schema, ipc::IpcWriteOptions options) {}
+  /// Only needed for DoPut/DoExchange
+  virtual void Write(arrow::flight::FlightStreamChunk chunk, bool last) {}
+  /// Only needed for DoPut/DoExchange
+  virtual void DoneWriting() {}

Review Comment:
   No, I'm actually removing this (this is a slight optimization that gRPC implements but I don't think it's worth exposing here)



##########
cpp/src/arrow/flight/transport.h:
##########
@@ -223,44 +236,30 @@ ARROW_FLIGHT_EXPORT
 TransportRegistry* GetDefaultTransportRegistry();
 
 //------------------------------------------------------------
-// Error propagation helpers
-
-/// \brief Abstract status code as per the Flight specification.
-enum class TransportStatusCode {
-  kOk = 0,
-  kUnknown = 1,
-  kInternal = 2,
-  kInvalidArgument = 3,
-  kTimedOut = 4,
-  kNotFound = 5,
-  kAlreadyExists = 6,
-  kCancelled = 7,
-  kUnauthenticated = 8,
-  kUnauthorized = 9,
-  kUnimplemented = 10,
-  kUnavailable = 11,
-};
+// Async APIs
 
-/// \brief Abstract error status.
+/// \brief Transport-specific state for an async RPC.
 ///
-/// Transport implementations may use side channels (e.g. HTTP
-/// trailers) to convey additional information to reconstruct the
-/// original C++ status for implementations that can use it.
-struct ARROW_FLIGHT_EXPORT TransportStatus {
-  TransportStatusCode code;
-  std::string message;
-
-  /// \brief Convert a C++ status to an abstract transport status.
-  static TransportStatus FromStatus(const Status& arrow_status);
-
-  /// \brief Reconstruct a string-encoded TransportStatus.
-  static TransportStatus FromCodeStringAndMessage(const std::string& code_str,
-                                                  std::string message);
+/// Transport implementations may subclass this to store their own
+/// state, and stash an instance in a user-supplied AsyncListener via
+/// ClientTransport::GetAsyncRpc and ClientTransport::SetAsyncRpc.
+class AsyncRpc {
+ public:
+  virtual ~AsyncRpc() = default;
+  /// \brief Request cancellation of the RPC.
+  virtual void TryCancel() {}
 
-  /// \brief Convert an abstract transport status to a C++ status.
-  Status ToStatus() const;
+  /// Only needed for DoPut/DoExchange
+  virtual void Begin(std::shared_ptr<Schema> schema, ipc::IpcWriteOptions options) {}
+  /// Only needed for DoPut/DoExchange
+  virtual void Write(arrow::flight::FlightStreamChunk chunk, bool last) {}
+  /// Only needed for DoPut/DoExchange
+  virtual void DoneWriting() {}

Review Comment:
   this being `last`, that is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on PR #36205:
URL: https://github.com/apache/arrow/pull/36205#issuecomment-1607579762

   For the record, if you added support for a single method (for example DoGet?), it might make reviewing a bit easier?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242095494


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   Not here I think (it's not possible to implement with gRPC, and is that really a proven mechanism? Did Acero ever actually implement it or was it only a theoretical API)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238790196


##########
TODOs.txt:
##########
@@ -0,0 +1,17 @@
+- Use a uniform API for all async Flight client APIs:
+
+  void Foo(const FlightCallOptions&, shared_ptr<ReadListener<T>> listener);
+
+  - shared_ptr, or raw pointer? (gRPC uses the latter...)
+  - How does this look for writes?
+  - Maybe we always call AddHold and RemoveHold? (It's unclear to me
+    why you'd want to read/write after the stream ends...)
+
+- StopToken needs to be able to run a callback, so that we can cancel gRPC calls.

Review Comment:
   I don't think there's a scope that would work, so we would have to explicitly deregister it before calling OnFinish.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238790687


##########
TODOs.txt:
##########
@@ -0,0 +1,17 @@
+- Use a uniform API for all async Flight client APIs:
+
+  void Foo(const FlightCallOptions&, shared_ptr<ReadListener<T>> listener);
+
+  - shared_ptr, or raw pointer? (gRPC uses the latter...)
+  - How does this look for writes?
+  - Maybe we always call AddHold and RemoveHold? (It's unclear to me
+    why you'd want to read/write after the stream ends...)
+
+- StopToken needs to be able to run a callback, so that we can cancel gRPC calls.

Review Comment:
   Or I suppose if we make it the caller's responsibility, it can be deregistered when the listener itself is cleaned up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #36205:
URL: https://github.com/apache/arrow/pull/36205#issuecomment-1604634820

   Async DoPut is implemented, though there is a case not yet handled (and I haven't exposed backpressure yet/and we probably need some mutexes)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238773601


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;

Review Comment:
   I think it's best to not have multiple error paths. If we return a Status here, what is the implementation supposed to do with it? It can at best just give it straight back to OnFinish.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #36205:
URL: https://github.com/apache/arrow/pull/36205#issuecomment-1601169751

   CC @pitrou @zeroshade @wjones127 if you were interested in async APIs.
   
   I had to add a small hook to ipc::StreamDecoder. I think it would be preferable to extract a formal "IPC state machine" from ipc/reader.cc and make sure that it's consistent with/used by both StreamDecoder and RecordBatchStreamReader.
   
   TODOs:
   
   - [ ] Figure out how to handle writes (DoPut, DoExchange)
   - [ ] Switch to using TransportStatus to report errors (and maybe provide an arrow::Status on the side, or convert it to an error detail; see below)
   - [ ] Add RPC error details to TransportStatus (this is part of Flight RPC already, but adding it to TransportStatus will help formalize it)
   - [ ] Add tests
   
   Things not handled here:
   
   - [ ] Server APIs
   - [ ] Python bindings (though I do want to sketch out how this works with asyncio or AnyIO; it seems community may prefer AnyIO: https://github.com/apache/arrow-adbc/issues/224)
   - [ ] Enabling this to work with StopToken (we need StopToken to be able to call a callback, I think)
   - [ ] Per-read timeouts (i.e. allow a long/unlimited timeout for a streaming RPC, but stop the RPC if the server does not send a message for a long time)
   - [ ] Making the sync API wrap the async API


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242094559


##########
TODOs.txt:
##########
@@ -0,0 +1,17 @@
+- Use a uniform API for all async Flight client APIs:
+
+  void Foo(const FlightCallOptions&, shared_ptr<ReadListener<T>> listener);
+
+  - shared_ptr, or raw pointer? (gRPC uses the latter...)

Review Comment:
   TryCancel is included as a method.
   
   I can switch to forcing a shared pointer again...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #36205:
URL: https://github.com/apache/arrow/pull/36205#issuecomment-1607583473

   I can rearrange things, but I'm trying to get a complete slice of the client working in context


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1243877475


##########
TODOs.txt:
##########
@@ -0,0 +1,17 @@
+- Use a uniform API for all async Flight client APIs:
+
+  void Foo(const FlightCallOptions&, shared_ptr<ReadListener<T>> listener);
+
+  - shared_ptr, or raw pointer? (gRPC uses the latter...)

Review Comment:
   weak_ptr is a smart non-owning pointer, so it provides both memory safety _and_ lack of cyclic references: exactly what we seem to be after here?
   
   My experience, at least, is that trying to do async programming with raw pointers or references is extremely fragile and difficult to get right (there are all kinds of subtle issues, for example with destructors and virtual methods as you yourself found out :-)).
   
   (FTR: even in Python, I sometimes resorted to weakrefs when async programming)
   
   @westonpace Thoughts?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238773836


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.

Review Comment:
   Yes, this part is still very much a WIP.



##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   Right, only one Write can be 'active' at any time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238770827


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   So the application cannot issue several `Write` calls if the corresponding `OnWritten` notifications haven't been received yet?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1241960152


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   The `PauseProducing`/`ResumeProducing` mechanism might be an inspiration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242275235


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   I'll try to clarify that (clearly this is confusing/not obvious)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1238791202


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;

Review Comment:
   It can decide to automatically cancel the RPC and log the error gracefully.



##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   But why not let the caller buffer writes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242192615


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   Well, but only having Start/StopProducing means baking in the logic into a slower-moving library instead.  
   
   For queue depth: on each OnWritten callback, the application would check the queue depth and enqueue messages until there's enough work in the buffer. If it wants to be more sophisticated, it can track (or we could expose) an estimate of the bytes enqueued instead. If you really want to be sophisticated, you could estimate bandwidth by also tracking the time between OnWritten callbacks.
   
   For both gRPC and UCX, we'd have to bake some variant of this logic into the library.



##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   Well, but only having Start/StopProducing means baking in the logic into a slower-moving library instead.  
   
   For queue depth: on each OnWritten callback, the application would check the queue depth and enqueue messages until there's enough work in the buffer. If it wants to be more sophisticated, it can track (or we could expose) an estimate of the bytes enqueued instead. If you really want to be sophisticated, you could estimate bandwidth by also tracking the time between OnWritten callbacks.
   
   For both gRPC and UCX, we'd have to bake some variant of this logic into the library if we wanted Start/StopProducing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242182038


##########
cpp/src/arrow/flight/types_async.h:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/options.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::flight {
+
+class IpcPutter;
+
+/// \defgroup flight-async Async Flight Types
+/// Common types used for asynchronous Flight APIs.
+/// @{
+
+/// \brief Non-templated state for an async RPC.
+class AsyncListenerBase {
+ public:
+  AsyncListenerBase();
+  virtual ~AsyncListenerBase();
+
+  /// \brief Request cancellation of the RPC.
+  ///
+  /// The RPC is not cancelled until AsyncListener::OnFinish is called.
+  void TryCancel();
+
+ private:
+  friend class arrow::flight::internal::ClientTransport;
+  friend class arrow::flight::IpcPutter;
+
+  /// Transport-specific state for this RPC.  Transport
+  /// implementations may store and retrieve state here via
+  /// ClientTransport::SetAsyncRpc and ClientTransport::GetAsyncRpc.
+  std::unique_ptr<internal::AsyncRpc> rpc_state_;
+};
+
+/// \brief Callbacks for results from async RPCs.
+///
+/// A single listener may not be used for multiple concurrent RPC
+/// calls.  The application MUST hold the listener alive until
+/// OnFinish() is called and has finished.
+template <typename T>
+class ARROW_FLIGHT_EXPORT AsyncListener : public AsyncListenerBase {
+ public:
+  /// \brief Get the next server result.
+  /// This will never be called concurrently with itself or OnFinish.
+  virtual void OnNext(T message) = 0;
+  /// \brief Get the final status.
+  /// This will never be called concurrently with itself or OnNext.
+  virtual void OnFinish(TransportStatus status) = 0;
+};
+
+/// \brief Callbacks for results from async RPCs that read Arrow data.
+class ARROW_FLIGHT_EXPORT IpcListener : public AsyncListener<FlightStreamChunk> {
+ public:
+  /// \brief Get the IPC schema.
+  /// This will never be called concurrently with itself, OnNext, or OnFinish.
+  virtual void OnSchema(std::shared_ptr<Schema> schema) = 0;
+};
+
+/// \brief Callbacks for DoPut.
+class ARROW_FLIGHT_EXPORT IpcPutter : public AsyncListener<std::unique_ptr<Buffer>> {
+ public:
+  // TODO: which of these can be made const T&?
+
+  /// \brief Begin writing an IPC stream.  May only be called once.
+  ///   Must be called before writing any record batches.
+  void Begin(std::shared_ptr<Schema> schema,
+             ipc::IpcWriteOptions options = ipc::IpcWriteOptions::Defaults());
+  /// \brief Write a record batch.
+  void Write(std::shared_ptr<RecordBatch> batch) {
+    Write({std::move(batch), NULLPTR}, false);
+  }
+  /// \brief Write application metadata.  May be called before Begin.
+  void Write(std::shared_ptr<Buffer> app_metadata) {
+    Write({NULLPTR, std::move(app_metadata)}, false);
+  }
+  /// \brief Write a record batch with application metadata.
+  ///
+  /// \param[in] last If true, this is the last write on this stream.
+  ///   This may let the underlying transport optimize the write.
+  ///   Semantically equivalent to calling Write then DoneWriting.
+  void Write(FlightStreamChunk chunk, bool last = false);
+  /// \brief Indicate that the client is done writing.  Must be
+  ///   called, or OnFinish will never be called.
+  void DoneWriting();
+
+  /// \brief Begin or Write finished.  The application may Write() again.
+  virtual void OnWritten() = 0;

Review Comment:
   But Flight-level queueing is also useful to smooth over the fact that one "write" may actually be multiple gRPC-level writes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36205:
URL: https://github.com/apache/arrow/pull/36205#discussion_r1242157632


##########
cpp/src/arrow/flight/flight_test.cc:
##########
@@ -1019,6 +1027,37 @@ TEST_F(TestFlightClient, DoAction) {
 
   ASSERT_OK_AND_ASSIGN(result, stream->Next());
   ASSERT_EQ(nullptr, result);
+
+  {
+    class Listener : public AsyncListener<Result> {
+     public:
+      void OnNext(Result result) override { results_.push_back(std::move(result)); }
+
+      void OnFinish(TransportStatus status) override {
+        if (status.ok()) {
+          future_.MarkFinished(std::move(results_));
+        } else {
+          future_.MarkFinished(status.ToStatus());
+        }
+      }
+
+      arrow::Future<std::vector<Result>> future_ =
+          arrow::Future<std::vector<Result>>::Make();
+      std::vector<Result> results_;
+    };
+    // XXX: if you don't wait for rpc to finish, UB occurs because the
+    // subclass part of the listener will be occurred, and so gRPC
+    // will call back into OnFinish but this will be gone it will have
+    // been replaced by "pure virtual" stub

Review Comment:
   Seems like a good argument to have a shared_ptr somewhere? :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm closed pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm closed pull request #36205: GH-16604: [C++][FlightRPC] Add async Flight client
URL: https://github.com/apache/arrow/pull/36205


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org