You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/11/26 19:38:31 UTC

[GitHub] [arrow] keeratsingh opened a new pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

keeratsingh opened a new pull request #8780:
URL: https://github.com/apache/arrow/pull/8780


   - This is a POC for the proposed design [Link](https://docs.google.com/document/d/187DlGpIpOUPGhWvXVQEq0mXw_hdWjzzOuZp0p5qzBp0/edit?usp=sharing)
   - This POC only add the retry capability to ListFlights for now, once we have agreement on the design, changes will be made to the other Flight APIs as well to have the retry capability.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#issuecomment-865260887


   Following up here - given 5.0.0 is targeting July, do you think this will be ready in time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#discussion_r535624785



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -388,6 +389,57 @@ public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption..
     }
   }
 
+  /**
+   * Wrapper class to wrap the iterator and handle auth failures and retry.
+   * @param <T> The type of iterator.
+   */
+  public static class WrappedFlightIterator<T> implements Iterator<T> {

Review comment:
       Sorry - let me clarify. What I mean is that gRPC offers 3 types of clients in Java: a blocking client, async client, and a "generic" client. This last client offers a unified interface for all types of calls: it accepts a StreamObserver which gets called with data from the server and it is given a StreamObserver to send data to the server. So if you were to rewrite all gRPC calls in terms of this last type of client, then you could implement a single StreamObserver wrapper that handles retries for all calls.
   
   Not a requirement - just offering a suggestion to avoid having to implement lots of single-purpose retries. It would take a decent amount of refactoring.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] keeratsingh commented on a change in pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

Posted by GitBox <gi...@apache.org>.
keeratsingh commented on a change in pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#discussion_r534469821



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -388,6 +389,57 @@ public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption..
     }
   }
 
+  /**
+   * Wrapper class to wrap the iterator and handle auth failures and retry.
+   * @param <T> The type of iterator.
+   */
+  public static class WrappedFlightIterator<T> implements Iterator<T> {

Review comment:
       @lidavidm Thank you for your feedback
   
   I have pushed a PS with the suggested changes.
   
   I do currently have a blocking call [[link]](https://github.com/apache/arrow/pull/8780/commits/2fc2971979a1fa93346948d2da9cba91a8399c43#diff-5f196a1c597e5a8d9c3cd835dae4db594b6204cb2d681c494d8ed3906cf47478R47) on the API calls that return an Iterable, like listFlights and listActions, as we need to have the iterable populated before we return it to the FlightClient, if not then we are back to the same approach using a wrapped iterator.
   
   While the changes latest PS `doClientCall ` only caters to API calls that return an iterable/iterator, the next step is to make them generic for calls like `getInfo, getSchema`
   Before I go ahead and do that, I wanted to confirm if this is what you had in mind when you made the suggestions above, regarding a generic StreamObserver.
   
   Other options that I had in mind were to pass the functional interfaces to pass the `ApiCallObserver` and have it retry the API call `onError`, in that case, we would not need to block `doClientCall` until the `ApiCallObserver` completes as the API call would be retried asynchronously when the Observer encounters an error.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] keeratsingh commented on pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

Posted by GitBox <gi...@apache.org>.
keeratsingh commented on pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#issuecomment-874285499


   > Following up here - given 5.0.0 is targeting July, do you think this will be ready in time?
   
   Hey @lidavidm I am no longer working on this project. @tifflhl or @kylep-dremio would be better able to give you a definitive answer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] kylep-dremio commented on pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

Posted by GitBox <gi...@apache.org>.
kylep-dremio commented on pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#issuecomment-928161670


   @lidavidm - correct, we'll circle back to this but current priority is the C++ Flight SQL PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#issuecomment-928091968


   @kylep-dremio Just to follow up here, I'm assuming this isn't a priority at the moment?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#discussion_r535644940



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -388,6 +389,57 @@ public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption..
     }
   }
 
+  /**
+   * Wrapper class to wrap the iterator and handle auth failures and retry.
+   * @param <T> The type of iterator.
+   */
+  public static class WrappedFlightIterator<T> implements Iterator<T> {

Review comment:
       So I see you have switched to using the generic client; my comment is then more about the level of abstraction appropriate for implementing retry. In particular, keeping the logic inside a wrapper StreamObserver would, IMO, make it easier to preserve things like the streaming nature of calls without having to buffer results.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#discussion_r535635203



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ClientApiCallWrapper.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.arrow.flight.grpc.StatusUtils;
+
+import io.grpc.stub.StreamObserver;
+
+/**
+ * Utility class for executing an API calls on the FlightServer.
+ */
+public class ClientApiCallWrapper {
+
+  /**
+   * Execute client call using the Stream Observer ApiCallObserver.
+   * @param apiCall Function call to execute.
+   * @param <T> Type of the values in the stream.
+   * @return An iterable of type T.
+   */
+  private static <T> Iterable<T> doClientCall(Consumer<ApiCallObserver<T>> apiCall) {
+    final ApiCallObserver<T> observer = new ApiCallObserver<>();
+    try {
+      apiCall.accept(observer);
+      while (!observer.completed.get()) {/* Wait for results */}

Review comment:
       Can we use a condition variable instead of busy-waiting?

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ClientApiCallWrapper.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.arrow.flight.grpc.StatusUtils;
+
+import io.grpc.stub.StreamObserver;
+
+/**
+ * Utility class for executing an API calls on the FlightServer.
+ */
+public class ClientApiCallWrapper {
+
+  /**
+   * Execute client call using the Stream Observer ApiCallObserver.

Review comment:
       nit: more specifically, this converts an async callback-based call into a synchronous iterator

##########
File path: java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestRetryWithExpiredToken.java
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import org.apache.arrow.flight.auth2.Auth2Constants;
+import org.apache.arrow.flight.auth2.AuthUtilities;
+import org.apache.arrow.flight.auth2.BasicCallHeaderAuthenticator;
+import org.apache.arrow.flight.auth2.BearerTokenAuthenticator;
+import org.apache.arrow.flight.auth2.CallHeaderAuthenticator;
+import org.apache.arrow.flight.impl.Flight;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+
+public class TestRetryWithExpiredToken {
+
+  private static final String USERNAME_1 = "flight1";
+  private static final String PASSWORD_1 = "woohoo1";
+  private BufferAllocator allocator;
+  private FlightServer server;
+  private FlightClient client;
+  private FlightClient client2;
+
+  @Before
+  public void setup() throws Exception {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+    startServerAndClient();
+  }
+
+  private FlightProducer getFlightProducer() {
+    return new NoOpFlightProducer() {
+      @Override
+      public void listFlights(CallContext context, Criteria criteria,
+                              StreamListener<FlightInfo> listener) {
+        if (criteria.getExpression().length > 0) {
+          // Don't send anything if criteria are set
+          listener.onCompleted();
+        }
+        try {
+          listener.onNext(new FlightInfo(Flight.FlightInfo.newBuilder()
+                  .setFlightDescriptor(Flight.FlightDescriptor.newBuilder()
+                          .setType(Flight.FlightDescriptor.DescriptorType.CMD)
+                          .setCmd(ByteString.copyFrom("flight1", Charsets.UTF_8)))
+                  .build()));
+        } catch (URISyntaxException e) {
+          listener.onError(e);
+          return;
+        }
+        listener.onCompleted();
+      }
+
+      @Override
+      public void listActions(CallContext context,
+                              StreamListener<ActionType> listener) {
+        listener.onNext(new ActionType(Flight.ActionType.newBuilder()
+                .setDescription("action description1")
+                .setType("action type1")
+                .build()));
+        listener.onNext(new ActionType(Flight.ActionType.newBuilder()
+                .setDescription("action description2")
+                .setType("action type2")
+                .build()));
+        listener.onCompleted();
+      }
+
+      @Override
+      public void doAction(CallContext context, Action action,
+                           StreamListener<Result> listener) {
+        listener.onNext(new Result("action1".getBytes(Charsets.UTF_8)));
+        listener.onNext(new Result("action2".getBytes(Charsets.UTF_8)));
+        listener.onCompleted();
+      }
+
+      @Override
+      public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
+        if (!context.peerIdentity().equals(USERNAME_1)) {
+          listener.error(new IllegalArgumentException("Invalid username"));
+          return;
+        }
+        final Schema pojoSchema = new Schema(ImmutableList.of(Field.nullable("a",
+                Types.MinorType.BIGINT.getType())));
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(pojoSchema, allocator)) {
+          listener.start(root);
+          root.allocateNew();
+          root.setRowCount(4095);
+          listener.putNext();
+          listener.completed();
+        }
+      }
+    };
+  }
+
+  private void startServerAndClient() throws IOException {
+    final FlightProducer flightProducer = getFlightProducer();
+    this.server = FlightTestUtil.getStartedServer((location) -> FlightServer
+            .builder(allocator, location, flightProducer)
+            .headerAuthenticator(new GeneratedTestBearerTokenAuthenticator(
+                    new BasicCallHeaderAuthenticator(this::validate)))
+            .build());
+
+    this.client = FlightClient.builder(allocator, server.getLocation())
+            .build();
+  }
+
+  @After
+  public void shutdown() throws Exception {
+    AutoCloseables.close(client, client2, server, allocator);
+    client = null;
+    client2 = null;
+    server = null;
+    allocator = null;
+  }
+
+  private CallHeaderAuthenticator.AuthResult validate(String username, String password) {
+    if (Strings.isNullOrEmpty(username)) {
+      throw CallStatus.UNAUTHENTICATED.withDescription("Credentials not supplied.").toRuntimeException();
+    }
+    final String identity;
+    if (USERNAME_1.equals(username) && PASSWORD_1.equals(password)) {
+      identity = USERNAME_1;
+    } else {
+      throw CallStatus.UNAUTHENTICATED.withDescription("Username or password is invalid.").toRuntimeException();
+    }
+    return () -> identity;
+  }
+
+  @Test
+  public void testListFlightsWithRetry() {
+    client.authenticateBasicToken(USERNAME_1, PASSWORD_1);
+    Iterable<FlightInfo> flights = client.listFlights(Criteria.ALL);
+    int count = 0;
+    for (FlightInfo flight : flights) {
+      count += 1;
+      Assert.assertArrayEquals(flight.getDescriptor().getCommand(), "flight1".getBytes(Charsets.UTF_8));
+    }
+    Assert.assertEquals(1, count);
+  }
+
+  @Test
+  public void testListActionsWithRetry() {
+    client.authenticateBasicToken(USERNAME_1, PASSWORD_1);
+    Assert.assertFalse(ImmutableList.copyOf(client
+            .listActions())
+            .isEmpty());
+    Assert.assertFalse(ImmutableList.copyOf(client
+            .listActions())
+            .isEmpty());
+  }
+
+  @Test
+  public void testDoActionWithRetry() {
+    client.authenticateBasicToken(USERNAME_1, PASSWORD_1);
+    Assert.assertFalse(ImmutableList.copyOf(client
+            .doAction(new Action("hello")))
+            .isEmpty());
+    Assert.assertFalse(ImmutableList.copyOf(client
+            .doAction(new Action("world")))
+            .isEmpty());
+  }
+
+
+  /**
+   * Generates and caches bearer tokens from user credentials.

Review comment:
       Can we make it clear here that this class generates bearer tokens with a sequentially increasing ID, and it considers token #1 to be 'expired'? It was a little unclear how the retry was getting tested at first.

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ClientApiCallWrapper.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.arrow.flight.grpc.StatusUtils;
+
+import io.grpc.stub.StreamObserver;
+
+/**
+ * Utility class for executing an API calls on the FlightServer.

Review comment:
       nit: can we make this more descriptive (this is a set of utilities for making RPC calls with optional retries)?

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ClientApiCallWrapper.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.arrow.flight.grpc.StatusUtils;
+
+import io.grpc.stub.StreamObserver;
+
+/**
+ * Utility class for executing an API calls on the FlightServer.
+ */
+public class ClientApiCallWrapper {
+
+  /**
+   * Execute client call using the Stream Observer ApiCallObserver.
+   * @param apiCall Function call to execute.
+   * @param <T> Type of the values in the stream.
+   * @return An iterable of type T.
+   */
+  private static <T> Iterable<T> doClientCall(Consumer<ApiCallObserver<T>> apiCall) {
+    final ApiCallObserver<T> observer = new ApiCallObserver<>();
+    try {
+      apiCall.accept(observer);
+      while (!observer.completed.get()) {/* Wait for results */}
+      return observer.result;
+    } catch (ExecutionException e) {
+      throw StatusUtils.fromThrowable(e.getCause());
+    } catch (InterruptedException e) {
+      throw StatusUtils.fromThrowable(e);
+    }
+  }
+
+  /**
+   * This method calls the method doClientCall to get the results iterator and use that to get the
+   * results.
+   * @param apiCall The underlying Flight API call to execute.
+   * @param result Functional interface that returns the results.
+   * @param retryOnUnauthorized Should the API call be retried if it fails during the initial call.
+   * @param <T> Type of the values in the stream.
+   * @param <R> Type of the return value from this method.
+   * @return An iterable of type R.
+   */
+  static <T, R> Iterable<R> callFlightApi(Consumer<ApiCallObserver<T>> apiCall,
+                                          Function<Iterator<T>, Iterator<R>> result,
+                                          boolean retryOnUnauthorized) {
+    try {
+      final Iterator<T> iterator = doClientCall(apiCall).iterator();
+      return () -> result.apply(iterator);
+    } catch (FlightRuntimeException ex) {

Review comment:
       This should only retry on the appropriate status right?

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ClientApiCallWrapper.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.arrow.flight.grpc.StatusUtils;
+
+import io.grpc.stub.StreamObserver;
+
+/**
+ * Utility class for executing an API calls on the FlightServer.
+ */
+public class ClientApiCallWrapper {
+
+  /**
+   * Execute client call using the Stream Observer ApiCallObserver.
+   * @param apiCall Function call to execute.
+   * @param <T> Type of the values in the stream.
+   * @return An iterable of type T.
+   */
+  private static <T> Iterable<T> doClientCall(Consumer<ApiCallObserver<T>> apiCall) {
+    final ApiCallObserver<T> observer = new ApiCallObserver<>();
+    try {
+      apiCall.accept(observer);
+      while (!observer.completed.get()) {/* Wait for results */}
+      return observer.result;
+    } catch (ExecutionException e) {
+      throw StatusUtils.fromThrowable(e.getCause());
+    } catch (InterruptedException e) {
+      throw StatusUtils.fromThrowable(e);
+    }
+  }
+
+  /**
+   * This method calls the method doClientCall to get the results iterator and use that to get the
+   * results.
+   * @param apiCall The underlying Flight API call to execute.
+   * @param result Functional interface that returns the results.
+   * @param retryOnUnauthorized Should the API call be retried if it fails during the initial call.
+   * @param <T> Type of the values in the stream.
+   * @param <R> Type of the return value from this method.
+   * @return An iterable of type R.
+   */
+  static <T, R> Iterable<R> callFlightApi(Consumer<ApiCallObserver<T>> apiCall,
+                                          Function<Iterator<T>, Iterator<R>> result,
+                                          boolean retryOnUnauthorized) {
+    try {
+      final Iterator<T> iterator = doClientCall(apiCall).iterator();
+      return () -> result.apply(iterator);
+    } catch (FlightRuntimeException ex) {
+      if (retryOnUnauthorized) {
+        final Iterator<T> iterator = doClientCall(apiCall).iterator();
+        return () -> result.apply(iterator);
+      }
+      throw ex;
+    }
+  }
+
+  /**
+   * Call observer that implements a StreamObserver and stores the values in the stream into
+   * and iterable as it encounters them.
+   * @param <T> Type of the values in the stream.
+   */
+  static class ApiCallObserver<T> implements StreamObserver<T> {
+    private final CompletableFuture<Boolean> completed;
+    private final List<T> result = new ArrayList<>();

Review comment:
       This means we now build up the entire result set in memory instead of being able to stream it.

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ClientApiCallWrapper.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.arrow.flight.grpc.StatusUtils;
+
+import io.grpc.stub.StreamObserver;
+
+/**
+ * Utility class for executing an API calls on the FlightServer.
+ */
+public class ClientApiCallWrapper {
+
+  /**
+   * Execute client call using the Stream Observer ApiCallObserver.
+   * @param apiCall Function call to execute.
+   * @param <T> Type of the values in the stream.
+   * @return An iterable of type T.
+   */
+  private static <T> Iterable<T> doClientCall(Consumer<ApiCallObserver<T>> apiCall) {
+    final ApiCallObserver<T> observer = new ApiCallObserver<>();
+    try {
+      apiCall.accept(observer);
+      while (!observer.completed.get()) {/* Wait for results */}
+      return observer.result;
+    } catch (ExecutionException e) {
+      throw StatusUtils.fromThrowable(e.getCause());
+    } catch (InterruptedException e) {
+      throw StatusUtils.fromThrowable(e);
+    }
+  }
+
+  /**
+   * This method calls the method doClientCall to get the results iterator and use that to get the
+   * results.
+   * @param apiCall The underlying Flight API call to execute.
+   * @param result Functional interface that returns the results.
+   * @param retryOnUnauthorized Should the API call be retried if it fails during the initial call.
+   * @param <T> Type of the values in the stream.
+   * @param <R> Type of the return value from this method.
+   * @return An iterable of type R.
+   */
+  static <T, R> Iterable<R> callFlightApi(Consumer<ApiCallObserver<T>> apiCall,
+                                          Function<Iterator<T>, Iterator<R>> result,
+                                          boolean retryOnUnauthorized) {
+    try {
+      final Iterator<T> iterator = doClientCall(apiCall).iterator();
+      return () -> result.apply(iterator);
+    } catch (FlightRuntimeException ex) {

Review comment:
       Also, doesn't this need to somehow reset the middleware so that it sends the basic auth credentials and not the bearer token?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#issuecomment-734457340


   <!--
     Licensed to the Apache Software Foundation (ASF) under one
     or more contributor license agreements.  See the NOTICE file
     distributed with this work for additional information
     regarding copyright ownership.  The ASF licenses this file
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
     with the License.  You may obtain a copy of the License at
   
       http://www.apache.org/licenses/LICENSE-2.0
   
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     KIND, either express or implied.  See the License for the
     specific language governing permissions and limitations
     under the License.
   -->
   
   Thanks for opening a pull request!
   
   Could you open an issue for this pull request on JIRA?
   https://issues.apache.org/jira/browse/ARROW
   
   Then could you also rename pull request title in the following format?
   
       ARROW-${JIRA_ID}: [${COMPONENT}] ${SUMMARY}
   
   See also:
   
     * [Other pull requests](https://github.com/apache/arrow/pulls/)
     * [Contribution Guidelines - How to contribute patches](https://arrow.apache.org/docs/developers/contributing.html#how-to-contribute-patches)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#discussion_r532590005



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -87,6 +89,7 @@
   private final MethodDescriptor<ArrowMessage, Flight.PutResult> doPutDescriptor;
   private final MethodDescriptor<ArrowMessage, ArrowMessage> doExchangeDescriptor;
   private final List<FlightClientMiddleware.Factory> middleware;
+  private boolean retryFlightCall = false;

Review comment:
       Let's name this something more specifc (`retryOnUnauthorized` perhaps)

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -388,6 +389,57 @@ public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption..
     }
   }
 
+  /**
+   * Wrapper class to wrap the iterator and handle auth failures and retry.
+   * @param <T> The type of iterator.
+   */
+  public static class WrappedFlightIterator<T> implements Iterator<T> {
+    final Supplier<Iterator<T>> supplier;
+    final Iterator<T> itr;
+    final boolean enableRetry;
+
+    /**
+     * WrappedFlightIterator constructor.
+     * @param supplier The iterator supplier.
+     * @param enableRetry Retry if first call fails.
+     */
+    public WrappedFlightIterator(Supplier<Iterator<T>> supplier, boolean enableRetry) {
+      this.supplier = supplier;
+      this.itr = supplier.get();
+      this.enableRetry = enableRetry;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        return itr.hasNext();
+      } catch (StatusRuntimeException e) {
+        if (enableRetry && e.getStatus().getCode() == Status.Code.UNAUTHENTICATED) {
+          // TODO: Retry Iterator from the last known position.

Review comment:
       I _believe_ you don't have to worry about this since you will either get the first result or UNAUTHENTICATED. (Or else, you'd have a very odd server that explicitly sends an UNAUTHENTICATED after giving you results.)

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -388,6 +389,57 @@ public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption..
     }
   }
 
+  /**
+   * Wrapper class to wrap the iterator and handle auth failures and retry.
+   * @param <T> The type of iterator.
+   */
+  public static class WrappedFlightIterator<T> implements Iterator<T> {
+    final Supplier<Iterator<T>> supplier;
+    final Iterator<T> itr;
+    final boolean enableRetry;
+
+    /**
+     * WrappedFlightIterator constructor.
+     * @param supplier The iterator supplier.
+     * @param enableRetry Retry if first call fails.
+     */
+    public WrappedFlightIterator(Supplier<Iterator<T>> supplier, boolean enableRetry) {
+      this.supplier = supplier;
+      this.itr = supplier.get();
+      this.enableRetry = enableRetry;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        return itr.hasNext();
+      } catch (StatusRuntimeException e) {
+        if (enableRetry && e.getStatus().getCode() == Status.Code.UNAUTHENTICATED) {
+          // TODO: Retry Iterator from the last known position.
+          // TODO: Log the retry explicitly stating the token authentication failed and retrying
+          // the failed operation.
+          return supplier.get().hasNext();

Review comment:
       This should update `this.itr` so that subsequent calls use the new call.

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/CredentialCallOption.java
##########
@@ -28,7 +28,11 @@
  * Method option for supplying credentials to method calls.
  */
 public class CredentialCallOption implements CallOptions.GrpcCallOption {
-  private final Consumer<CallHeaders> credentialWriter;
+  public Consumer<CallHeaders> getCredentialWriter() {

Review comment:
       nit: can we keep getters after fields/constructor?

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -388,6 +389,57 @@ public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption..
     }
   }
 
+  /**
+   * Wrapper class to wrap the iterator and handle auth failures and retry.
+   * @param <T> The type of iterator.
+   */
+  public static class WrappedFlightIterator<T> implements Iterator<T> {

Review comment:
       A general comment in the interests of code reuse; you may be better off implementing a generic StreamObserver<T> which can wrap any gRPC call. You'll have to change FlightClient to always use the async gRPC stub instead of the blocking stub, but then you can handle retries for any gRPC call without having to write special-purpose wrappers for each call.

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/auth2/ClientIncomingAuthHeaderMiddleware.java
##########
@@ -34,27 +34,29 @@
    */
   public static class Factory implements FlightClientMiddleware.Factory {

Review comment:
       This is what I intended in the doc - thank you!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#issuecomment-865260887


   Following up here - given 5.0.0 is targeting July, do you think this will be ready in time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] keeratsingh commented on pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

Posted by GitBox <gi...@apache.org>.
keeratsingh commented on pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#issuecomment-874285499


   > Following up here - given 5.0.0 is targeting July, do you think this will be ready in time?
   
   Hey @lidavidm I am no longer working on this project. @tifflhl or @kylep-dremio would be better able to give you a definitive answer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] kylep-dremio commented on pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

Posted by GitBox <gi...@apache.org>.
kylep-dremio commented on pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#issuecomment-928161670


   @lidavidm - correct, we'll circle back to this but current priority is the C++ Flight SQL PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #8780: [POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #8780:
URL: https://github.com/apache/arrow/pull/8780#issuecomment-928091968


   @kylep-dremio Just to follow up here, I'm assuming this isn't a priority at the moment?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org