You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/11/30 13:27:08 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

lidavidm commented on a change in pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#discussion_r532590005



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -87,6 +89,7 @@
   private final MethodDescriptor<ArrowMessage, Flight.PutResult> doPutDescriptor;
   private final MethodDescriptor<ArrowMessage, ArrowMessage> doExchangeDescriptor;
   private final List<FlightClientMiddleware.Factory> middleware;
+  private boolean retryFlightCall = false;

Review comment:
       Let's name this something more specifc (`retryOnUnauthorized` perhaps)

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -388,6 +389,57 @@ public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption..
     }
   }
 
+  /**
+   * Wrapper class to wrap the iterator and handle auth failures and retry.
+   * @param <T> The type of iterator.
+   */
+  public static class WrappedFlightIterator<T> implements Iterator<T> {
+    final Supplier<Iterator<T>> supplier;
+    final Iterator<T> itr;
+    final boolean enableRetry;
+
+    /**
+     * WrappedFlightIterator constructor.
+     * @param supplier The iterator supplier.
+     * @param enableRetry Retry if first call fails.
+     */
+    public WrappedFlightIterator(Supplier<Iterator<T>> supplier, boolean enableRetry) {
+      this.supplier = supplier;
+      this.itr = supplier.get();
+      this.enableRetry = enableRetry;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        return itr.hasNext();
+      } catch (StatusRuntimeException e) {
+        if (enableRetry && e.getStatus().getCode() == Status.Code.UNAUTHENTICATED) {
+          // TODO: Retry Iterator from the last known position.

Review comment:
       I _believe_ you don't have to worry about this since you will either get the first result or UNAUTHENTICATED. (Or else, you'd have a very odd server that explicitly sends an UNAUTHENTICATED after giving you results.)

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -388,6 +389,57 @@ public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption..
     }
   }
 
+  /**
+   * Wrapper class to wrap the iterator and handle auth failures and retry.
+   * @param <T> The type of iterator.
+   */
+  public static class WrappedFlightIterator<T> implements Iterator<T> {
+    final Supplier<Iterator<T>> supplier;
+    final Iterator<T> itr;
+    final boolean enableRetry;
+
+    /**
+     * WrappedFlightIterator constructor.
+     * @param supplier The iterator supplier.
+     * @param enableRetry Retry if first call fails.
+     */
+    public WrappedFlightIterator(Supplier<Iterator<T>> supplier, boolean enableRetry) {
+      this.supplier = supplier;
+      this.itr = supplier.get();
+      this.enableRetry = enableRetry;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        return itr.hasNext();
+      } catch (StatusRuntimeException e) {
+        if (enableRetry && e.getStatus().getCode() == Status.Code.UNAUTHENTICATED) {
+          // TODO: Retry Iterator from the last known position.
+          // TODO: Log the retry explicitly stating the token authentication failed and retrying
+          // the failed operation.
+          return supplier.get().hasNext();

Review comment:
       This should update `this.itr` so that subsequent calls use the new call.

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/CredentialCallOption.java
##########
@@ -28,7 +28,11 @@
  * Method option for supplying credentials to method calls.
  */
 public class CredentialCallOption implements CallOptions.GrpcCallOption {
-  private final Consumer<CallHeaders> credentialWriter;
+  public Consumer<CallHeaders> getCredentialWriter() {

Review comment:
       nit: can we keep getters after fields/constructor?

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -388,6 +389,57 @@ public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption..
     }
   }
 
+  /**
+   * Wrapper class to wrap the iterator and handle auth failures and retry.
+   * @param <T> The type of iterator.
+   */
+  public static class WrappedFlightIterator<T> implements Iterator<T> {

Review comment:
       A general comment in the interests of code reuse; you may be better off implementing a generic StreamObserver<T> which can wrap any gRPC call. You'll have to change FlightClient to always use the async gRPC stub instead of the blocking stub, but then you can handle retries for any gRPC call without having to write special-purpose wrappers for each call.

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/auth2/ClientIncomingAuthHeaderMiddleware.java
##########
@@ -34,27 +34,29 @@
    */
   public static class Factory implements FlightClientMiddleware.Factory {

Review comment:
       This is what I intended in the doc - thank you!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org