You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/12/17 22:35:04 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #11989: ARROW-15112: [Integration][C++][Java] Implement Flight SQL integration tests

lidavidm commented on a change in pull request #11989:
URL: https://github.com/apache/arrow/pull/11989#discussion_r771725031



##########
File path: cpp/src/arrow/flight/integration_tests/test_integration.h
##########
@@ -28,22 +28,27 @@
 
 namespace arrow {

Review comment:
       This isn't your mistake, but this file is missing a `#pragma once` up top - hence the CI failure.

##########
File path: java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/FlightSqlScenarioProducer.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight.integration.tests;
+
+import static com.google.protobuf.Any.pack;
+import static java.util.Collections.singletonList;
+
+import java.util.List;
+
+import org.apache.arrow.flight.Criteria;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.PutResult;
+import org.apache.arrow.flight.Result;
+import org.apache.arrow.flight.SchemaResult;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.flight.sql.FlightSqlProducer;
+import org.apache.arrow.flight.sql.impl.FlightSql;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+
+/**
+ * Hardcoded Flight SQL producer used for cross-language integration tests.
+ */
+public class FlightSqlScenarioProducer implements FlightSqlProducer {
+  private final BufferAllocator allocator;
+
+  public FlightSqlScenarioProducer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  static Schema getQuerySchema() {
+    return new Schema(
+        singletonList(
+            new Field("id", FieldType.nullable(new ArrowType.Int(64, true)), null)
+        )
+    );
+  }
+
+  @Override
+  public void createPreparedStatement(FlightSql.ActionCreatePreparedStatementRequest request,
+                                      CallContext context, StreamListener<Result> listener) {
+    IntegrationAssertions.assertTrue("Expect to be one of the two queries used on tests",
+        request.getQuery().equals("SELECT PREPARED STATEMENT") ||
+            request.getQuery().equals("UPDATE PREPARED STATEMENT"));
+
+    final FlightSql.ActionCreatePreparedStatementResult
+        result = FlightSql.ActionCreatePreparedStatementResult.newBuilder()
+        .setPreparedStatementHandle(ByteString.copyFromUtf8(request.getQuery() + " HANDLE"))
+        .build();
+    listener.onNext(new Result(pack(result).toByteArray()));
+    listener.onCompleted();
+  }
+
+  @Override
+  public void closePreparedStatement(FlightSql.ActionClosePreparedStatementRequest request,
+                                     CallContext context, StreamListener<Result> listener) {
+    IntegrationAssertions.assertTrue("Expect to be one of the two queries used on tests",
+        request.getPreparedStatementHandle().toStringUtf8().equals("SELECT PREPARED STATEMENT HANDLE") ||
+            request.getPreparedStatementHandle().toStringUtf8().equals("UPDATE PREPARED STATEMENT HANDLE"));
+
+    listener.onCompleted();
+  }
+
+  @Override
+  public FlightInfo getFlightInfoStatement(FlightSql.CommandStatementQuery command,
+                                           CallContext context, FlightDescriptor descriptor) {
+    IntegrationAssertions.assertEquals(command.getQuery(), "SELECT STATEMENT");
+
+    ByteString handle = ByteString.copyFromUtf8("SELECT STATEMENT HANDLE");
+
+    FlightSql.TicketStatementQuery ticket = FlightSql.TicketStatementQuery.newBuilder()
+        .setStatementHandle(handle)
+        .build();
+    return getFlightInfoForSchema(ticket, descriptor, getQuerySchema());
+  }
+
+  @Override
+  public FlightInfo getFlightInfoPreparedStatement(FlightSql.CommandPreparedStatementQuery command,
+                                                   CallContext context,
+                                                   FlightDescriptor descriptor) {
+    IntegrationAssertions.assertEquals(command.getPreparedStatementHandle().toStringUtf8(),
+        "SELECT PREPARED STATEMENT HANDLE");
+
+    return getFlightInfoForSchema(command, descriptor, getQuerySchema());
+  }
+
+  @Override
+  public SchemaResult getSchemaStatement(FlightSql.CommandStatementQuery command,
+                                         CallContext context, FlightDescriptor descriptor) {
+    return new SchemaResult(getQuerySchema());
+  }
+
+  @Override
+  public void getStreamStatement(FlightSql.TicketStatementQuery ticket, CallContext context,
+                                 ServerStreamListener listener) {
+    serveJsonToStreamListener(listener, getQuerySchema());
+  }
+
+  @Override
+  public void getStreamPreparedStatement(FlightSql.CommandPreparedStatementQuery command,
+                                         CallContext context, ServerStreamListener listener) {
+    serveJsonToStreamListener(listener, getQuerySchema());
+  }
+
+  private Runnable acceptPutReturnConstant(StreamListener<PutResult> ackStream, int value) {
+    return () -> {
+      final FlightSql.DoPutUpdateResult build =
+          FlightSql.DoPutUpdateResult.newBuilder().setRecordCount(value).build();
+
+      try (final ArrowBuf buffer = allocator.buffer(build.getSerializedSize())) {
+        buffer.writeBytes(build.toByteArray());
+        ackStream.onNext(PutResult.metadata(buffer));
+        ackStream.onCompleted();
+      }
+    };
+  }
+
+  @Override
+  public Runnable acceptPutStatement(FlightSql.CommandStatementUpdate command, CallContext context,
+                                     FlightStream flightStream,
+                                     StreamListener<PutResult> ackStream) {
+    IntegrationAssertions.assertEquals(command.getQuery(), "UPDATE STATEMENT");
+
+    return acceptPutReturnConstant(ackStream, 10000);
+  }
+
+  @Override
+  public Runnable acceptPutPreparedStatementUpdate(FlightSql.CommandPreparedStatementUpdate command,
+                                                   CallContext context, FlightStream flightStream,
+                                                   StreamListener<PutResult> ackStream) {
+    IntegrationAssertions.assertEquals(command.getPreparedStatementHandle().toStringUtf8(),
+        "UPDATE PREPARED STATEMENT HANDLE");
+
+    return acceptPutReturnConstant(ackStream, 20000);
+  }
+
+  @Override
+  public Runnable acceptPutPreparedStatementQuery(FlightSql.CommandPreparedStatementQuery command,
+                                                  CallContext context, FlightStream flightStream,
+                                                  StreamListener<PutResult> ackStream) {
+    IntegrationAssertions.assertEquals(command.getPreparedStatementHandle(),
+        "SELECT PREPARED STATEMENT HANDLE");
+
+    return null;
+  }
+
+  @Override
+  public FlightInfo getFlightInfoSqlInfo(FlightSql.CommandGetSqlInfo request, CallContext context,
+                                         FlightDescriptor descriptor) {
+    IntegrationAssertions.assertEquals(request.getInfoCount(), 2);
+    IntegrationAssertions.assertEquals(request.getInfo(0),
+        FlightSql.SqlInfo.FLIGHT_SQL_SERVER_NAME_VALUE);
+    IntegrationAssertions.assertEquals(request.getInfo(1),
+        FlightSql.SqlInfo.FLIGHT_SQL_SERVER_READ_ONLY_VALUE);
+
+    return getFlightInfoForSchema(request, descriptor, Schemas.GET_SQL_INFO_SCHEMA);
+  }
+
+  @Override
+  public void getStreamSqlInfo(FlightSql.CommandGetSqlInfo command, CallContext context,
+                               ServerStreamListener listener) {
+    serveJsonToStreamListener(listener, Schemas.GET_SQL_INFO_SCHEMA);
+  }
+
+  @Override
+  public FlightInfo getFlightInfoCatalogs(FlightSql.CommandGetCatalogs request, CallContext context,
+                                          FlightDescriptor descriptor) {
+    return getFlightInfoForSchema(request, descriptor, Schemas.GET_CATALOGS_SCHEMA);
+  }
+
+  private void serveJsonToStreamListener(ServerStreamListener stream, Schema schema) {

Review comment:
       What does this have to do with JSON?

##########
File path: cpp/src/arrow/CMakeLists.txt
##########
@@ -736,6 +736,10 @@ if(ARROW_FLIGHT_SQL)
   add_subdirectory(flight/sql)
 endif()
 
+if(ARROW_FLIGHT AND ARROW_BUILD_INTEGRATION)

Review comment:
       `AND ARROW_FLIGHT_SQL`?

##########
File path: java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/FlightSqlScenarioProducer.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight.integration.tests;
+
+import static com.google.protobuf.Any.pack;
+import static java.util.Collections.singletonList;
+
+import java.util.List;
+
+import org.apache.arrow.flight.Criteria;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.PutResult;
+import org.apache.arrow.flight.Result;
+import org.apache.arrow.flight.SchemaResult;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.flight.sql.FlightSqlProducer;
+import org.apache.arrow.flight.sql.impl.FlightSql;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+
+/**
+ * Hardcoded Flight SQL producer used for cross-language integration tests.
+ */
+public class FlightSqlScenarioProducer implements FlightSqlProducer {
+  private final BufferAllocator allocator;
+
+  public FlightSqlScenarioProducer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  static Schema getQuerySchema() {
+    return new Schema(
+        singletonList(
+            new Field("id", FieldType.nullable(new ArrowType.Int(64, true)), null)
+        )
+    );
+  }
+
+  @Override
+  public void createPreparedStatement(FlightSql.ActionCreatePreparedStatementRequest request,
+                                      CallContext context, StreamListener<Result> listener) {
+    IntegrationAssertions.assertTrue("Expect to be one of the two queries used on tests",
+        request.getQuery().equals("SELECT PREPARED STATEMENT") ||
+            request.getQuery().equals("UPDATE PREPARED STATEMENT"));
+
+    final FlightSql.ActionCreatePreparedStatementResult
+        result = FlightSql.ActionCreatePreparedStatementResult.newBuilder()
+        .setPreparedStatementHandle(ByteString.copyFromUtf8(request.getQuery() + " HANDLE"))
+        .build();
+    listener.onNext(new Result(pack(result).toByteArray()));
+    listener.onCompleted();
+  }
+
+  @Override
+  public void closePreparedStatement(FlightSql.ActionClosePreparedStatementRequest request,
+                                     CallContext context, StreamListener<Result> listener) {
+    IntegrationAssertions.assertTrue("Expect to be one of the two queries used on tests",
+        request.getPreparedStatementHandle().toStringUtf8().equals("SELECT PREPARED STATEMENT HANDLE") ||
+            request.getPreparedStatementHandle().toStringUtf8().equals("UPDATE PREPARED STATEMENT HANDLE"));
+
+    listener.onCompleted();
+  }
+
+  @Override
+  public FlightInfo getFlightInfoStatement(FlightSql.CommandStatementQuery command,
+                                           CallContext context, FlightDescriptor descriptor) {
+    IntegrationAssertions.assertEquals(command.getQuery(), "SELECT STATEMENT");
+
+    ByteString handle = ByteString.copyFromUtf8("SELECT STATEMENT HANDLE");
+
+    FlightSql.TicketStatementQuery ticket = FlightSql.TicketStatementQuery.newBuilder()
+        .setStatementHandle(handle)
+        .build();
+    return getFlightInfoForSchema(ticket, descriptor, getQuerySchema());
+  }
+
+  @Override
+  public FlightInfo getFlightInfoPreparedStatement(FlightSql.CommandPreparedStatementQuery command,
+                                                   CallContext context,
+                                                   FlightDescriptor descriptor) {
+    IntegrationAssertions.assertEquals(command.getPreparedStatementHandle().toStringUtf8(),
+        "SELECT PREPARED STATEMENT HANDLE");
+
+    return getFlightInfoForSchema(command, descriptor, getQuerySchema());
+  }
+
+  @Override
+  public SchemaResult getSchemaStatement(FlightSql.CommandStatementQuery command,
+                                         CallContext context, FlightDescriptor descriptor) {
+    return new SchemaResult(getQuerySchema());
+  }
+
+  @Override
+  public void getStreamStatement(FlightSql.TicketStatementQuery ticket, CallContext context,
+                                 ServerStreamListener listener) {
+    serveJsonToStreamListener(listener, getQuerySchema());
+  }
+
+  @Override
+  public void getStreamPreparedStatement(FlightSql.CommandPreparedStatementQuery command,
+                                         CallContext context, ServerStreamListener listener) {
+    serveJsonToStreamListener(listener, getQuerySchema());
+  }
+
+  private Runnable acceptPutReturnConstant(StreamListener<PutResult> ackStream, int value) {
+    return () -> {
+      final FlightSql.DoPutUpdateResult build =
+          FlightSql.DoPutUpdateResult.newBuilder().setRecordCount(value).build();
+
+      try (final ArrowBuf buffer = allocator.buffer(build.getSerializedSize())) {
+        buffer.writeBytes(build.toByteArray());
+        ackStream.onNext(PutResult.metadata(buffer));
+        ackStream.onCompleted();
+      }
+    };
+  }
+
+  @Override
+  public Runnable acceptPutStatement(FlightSql.CommandStatementUpdate command, CallContext context,
+                                     FlightStream flightStream,
+                                     StreamListener<PutResult> ackStream) {
+    IntegrationAssertions.assertEquals(command.getQuery(), "UPDATE STATEMENT");
+
+    return acceptPutReturnConstant(ackStream, 10000);
+  }
+
+  @Override
+  public Runnable acceptPutPreparedStatementUpdate(FlightSql.CommandPreparedStatementUpdate command,
+                                                   CallContext context, FlightStream flightStream,
+                                                   StreamListener<PutResult> ackStream) {
+    IntegrationAssertions.assertEquals(command.getPreparedStatementHandle().toStringUtf8(),
+        "UPDATE PREPARED STATEMENT HANDLE");
+
+    return acceptPutReturnConstant(ackStream, 20000);
+  }
+
+  @Override
+  public Runnable acceptPutPreparedStatementQuery(FlightSql.CommandPreparedStatementQuery command,
+                                                  CallContext context, FlightStream flightStream,
+                                                  StreamListener<PutResult> ackStream) {
+    IntegrationAssertions.assertEquals(command.getPreparedStatementHandle(),
+        "SELECT PREPARED STATEMENT HANDLE");
+
+    return null;
+  }
+
+  @Override
+  public FlightInfo getFlightInfoSqlInfo(FlightSql.CommandGetSqlInfo request, CallContext context,
+                                         FlightDescriptor descriptor) {
+    IntegrationAssertions.assertEquals(request.getInfoCount(), 2);
+    IntegrationAssertions.assertEquals(request.getInfo(0),
+        FlightSql.SqlInfo.FLIGHT_SQL_SERVER_NAME_VALUE);
+    IntegrationAssertions.assertEquals(request.getInfo(1),
+        FlightSql.SqlInfo.FLIGHT_SQL_SERVER_READ_ONLY_VALUE);
+
+    return getFlightInfoForSchema(request, descriptor, Schemas.GET_SQL_INFO_SCHEMA);
+  }
+
+  @Override
+  public void getStreamSqlInfo(FlightSql.CommandGetSqlInfo command, CallContext context,
+                               ServerStreamListener listener) {
+    serveJsonToStreamListener(listener, Schemas.GET_SQL_INFO_SCHEMA);
+  }
+
+  @Override
+  public FlightInfo getFlightInfoCatalogs(FlightSql.CommandGetCatalogs request, CallContext context,
+                                          FlightDescriptor descriptor) {
+    return getFlightInfoForSchema(request, descriptor, Schemas.GET_CATALOGS_SCHEMA);
+  }
+
+  private void serveJsonToStreamListener(ServerStreamListener stream, Schema schema) {

Review comment:
       Maybe `putEmptyBatchToStreamListener`?

##########
File path: java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/FlightSqlScenarioProducer.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight.integration.tests;
+
+import static com.google.protobuf.Any.pack;
+import static java.util.Collections.singletonList;
+
+import java.util.List;
+
+import org.apache.arrow.flight.Criteria;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.PutResult;
+import org.apache.arrow.flight.Result;
+import org.apache.arrow.flight.SchemaResult;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.flight.sql.FlightSqlProducer;
+import org.apache.arrow.flight.sql.impl.FlightSql;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+
+/**
+ * Hardcoded Flight SQL producer used for cross-language integration tests.
+ */
+public class FlightSqlScenarioProducer implements FlightSqlProducer {
+  private final BufferAllocator allocator;
+
+  public FlightSqlScenarioProducer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  static Schema getQuerySchema() {
+    return new Schema(
+        singletonList(
+            new Field("id", FieldType.nullable(new ArrowType.Int(64, true)), null)
+        )
+    );
+  }
+
+  @Override
+  public void createPreparedStatement(FlightSql.ActionCreatePreparedStatementRequest request,
+                                      CallContext context, StreamListener<Result> listener) {
+    IntegrationAssertions.assertTrue("Expect to be one of the two queries used on tests",
+        request.getQuery().equals("SELECT PREPARED STATEMENT") ||
+            request.getQuery().equals("UPDATE PREPARED STATEMENT"));
+
+    final FlightSql.ActionCreatePreparedStatementResult
+        result = FlightSql.ActionCreatePreparedStatementResult.newBuilder()
+        .setPreparedStatementHandle(ByteString.copyFromUtf8(request.getQuery() + " HANDLE"))
+        .build();
+    listener.onNext(new Result(pack(result).toByteArray()));
+    listener.onCompleted();
+  }
+
+  @Override
+  public void closePreparedStatement(FlightSql.ActionClosePreparedStatementRequest request,
+                                     CallContext context, StreamListener<Result> listener) {
+    IntegrationAssertions.assertTrue("Expect to be one of the two queries used on tests",
+        request.getPreparedStatementHandle().toStringUtf8().equals("SELECT PREPARED STATEMENT HANDLE") ||
+            request.getPreparedStatementHandle().toStringUtf8().equals("UPDATE PREPARED STATEMENT HANDLE"));
+
+    listener.onCompleted();
+  }
+
+  @Override
+  public FlightInfo getFlightInfoStatement(FlightSql.CommandStatementQuery command,
+                                           CallContext context, FlightDescriptor descriptor) {
+    IntegrationAssertions.assertEquals(command.getQuery(), "SELECT STATEMENT");
+
+    ByteString handle = ByteString.copyFromUtf8("SELECT STATEMENT HANDLE");
+
+    FlightSql.TicketStatementQuery ticket = FlightSql.TicketStatementQuery.newBuilder()
+        .setStatementHandle(handle)
+        .build();
+    return getFlightInfoForSchema(ticket, descriptor, getQuerySchema());
+  }
+
+  @Override
+  public FlightInfo getFlightInfoPreparedStatement(FlightSql.CommandPreparedStatementQuery command,
+                                                   CallContext context,
+                                                   FlightDescriptor descriptor) {
+    IntegrationAssertions.assertEquals(command.getPreparedStatementHandle().toStringUtf8(),
+        "SELECT PREPARED STATEMENT HANDLE");
+
+    return getFlightInfoForSchema(command, descriptor, getQuerySchema());
+  }
+
+  @Override
+  public SchemaResult getSchemaStatement(FlightSql.CommandStatementQuery command,
+                                         CallContext context, FlightDescriptor descriptor) {
+    return new SchemaResult(getQuerySchema());
+  }
+
+  @Override
+  public void getStreamStatement(FlightSql.TicketStatementQuery ticket, CallContext context,
+                                 ServerStreamListener listener) {
+    serveJsonToStreamListener(listener, getQuerySchema());
+  }
+
+  @Override
+  public void getStreamPreparedStatement(FlightSql.CommandPreparedStatementQuery command,
+                                         CallContext context, ServerStreamListener listener) {
+    serveJsonToStreamListener(listener, getQuerySchema());
+  }
+
+  private Runnable acceptPutReturnConstant(StreamListener<PutResult> ackStream, int value) {
+    return () -> {
+      final FlightSql.DoPutUpdateResult build =
+          FlightSql.DoPutUpdateResult.newBuilder().setRecordCount(value).build();
+
+      try (final ArrowBuf buffer = allocator.buffer(build.getSerializedSize())) {
+        buffer.writeBytes(build.toByteArray());
+        ackStream.onNext(PutResult.metadata(buffer));
+        ackStream.onCompleted();
+      }
+    };
+  }
+
+  @Override
+  public Runnable acceptPutStatement(FlightSql.CommandStatementUpdate command, CallContext context,
+                                     FlightStream flightStream,
+                                     StreamListener<PutResult> ackStream) {
+    IntegrationAssertions.assertEquals(command.getQuery(), "UPDATE STATEMENT");
+
+    return acceptPutReturnConstant(ackStream, 10000);
+  }
+
+  @Override
+  public Runnable acceptPutPreparedStatementUpdate(FlightSql.CommandPreparedStatementUpdate command,
+                                                   CallContext context, FlightStream flightStream,
+                                                   StreamListener<PutResult> ackStream) {
+    IntegrationAssertions.assertEquals(command.getPreparedStatementHandle().toStringUtf8(),
+        "UPDATE PREPARED STATEMENT HANDLE");
+
+    return acceptPutReturnConstant(ackStream, 20000);
+  }
+
+  @Override
+  public Runnable acceptPutPreparedStatementQuery(FlightSql.CommandPreparedStatementQuery command,
+                                                  CallContext context, FlightStream flightStream,
+                                                  StreamListener<PutResult> ackStream) {
+    IntegrationAssertions.assertEquals(command.getPreparedStatementHandle(),
+        "SELECT PREPARED STATEMENT HANDLE");
+
+    return null;

Review comment:
       Hmm, this should throw an exception if it's not implemented.

##########
File path: java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/FlightSqlScenarioProducer.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight.integration.tests;
+
+import static com.google.protobuf.Any.pack;
+import static java.util.Collections.singletonList;
+
+import java.util.List;
+
+import org.apache.arrow.flight.Criteria;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.PutResult;
+import org.apache.arrow.flight.Result;
+import org.apache.arrow.flight.SchemaResult;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.flight.sql.FlightSqlProducer;
+import org.apache.arrow.flight.sql.impl.FlightSql;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+
+/**
+ * Hardcoded Flight SQL producer used for cross-language integration tests.
+ */
+public class FlightSqlScenarioProducer implements FlightSqlProducer {
+  private final BufferAllocator allocator;
+
+  public FlightSqlScenarioProducer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  static Schema getQuerySchema() {

Review comment:
       Can we explicitly document this as the schema returned for a mock select query (both here and in C++)?

##########
File path: cpp/src/arrow/flight/integration_tests/CMakeLists.txt
##########
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_custom_target(arrow_flight_integration_tests)

Review comment:
       Is the target necessary?

##########
File path: cpp/src/arrow/flight/integration_tests/test_integration.cc
##########
@@ -0,0 +1,665 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/integration_tests/test_integration.h"
+#include "arrow/flight/client_middleware.h"
+#include "arrow/flight/server_middleware.h"
+#include "arrow/flight/sql/client.h"
+#include "arrow/flight/sql/server.h"
+#include "arrow/flight/test_util.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/dictionary.h"
+
+#include <arrow/testing/gtest_util.h>
+#include <arrow/testing/json_integration.h>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+namespace arrow {
+namespace flight {
+namespace integration_tests {
+
+/// \brief The server for the basic auth integration test.
+class AuthBasicProtoServer : public FlightServerBase {
+  Status DoAction(const ServerCallContext& context, const Action& action,
+                  std::unique_ptr<ResultStream>* result) override {
+    // Respond with the authenticated username.
+    auto buf = Buffer::FromString(context.peer_identity());
+    *result = std::unique_ptr<ResultStream>(new SimpleResultStream({Result{buf}}));
+    return Status::OK();
+  }
+};
+
+/// Validate the result of a DoAction.
+Status CheckActionResults(FlightClient* client, const Action& action,
+                          std::vector<std::string> results) {
+  std::unique_ptr<ResultStream> stream;
+  RETURN_NOT_OK(client->DoAction(action, &stream));
+  std::unique_ptr<Result> result;
+  for (const std::string& expected : results) {
+    RETURN_NOT_OK(stream->Next(&result));
+    if (!result) {
+      return Status::Invalid("Action result stream ended early");
+    }
+    const auto actual = result->body->ToString();
+    if (expected != actual) {
+      return Status::Invalid("Got wrong result; expected", expected, "but got", actual);
+    }
+  }
+  RETURN_NOT_OK(stream->Next(&result));
+  if (result) {
+    return Status::Invalid("Action result stream had too many entries");
+  }
+  return Status::OK();
+}
+
+// The expected username for the basic auth integration test.
+constexpr auto kAuthUsername = "arrow";
+// The expected password for the basic auth integration test.
+constexpr auto kAuthPassword = "flight";
+
+/// \brief A scenario testing the basic auth protobuf.
+class AuthBasicProtoScenario : public Scenario {
+  Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+                    FlightServerOptions* options) override {
+    server->reset(new AuthBasicProtoServer());
+    options->auth_handler =
+        std::make_shared<TestServerBasicAuthHandler>(kAuthUsername, kAuthPassword);
+    return Status::OK();
+  }
+
+  Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }
+
+  Status RunClient(std::unique_ptr<FlightClient> client) override {
+    Action action;
+    std::unique_ptr<ResultStream> stream;
+    std::shared_ptr<FlightStatusDetail> detail;
+    const auto& status = client->DoAction(action, &stream);
+    detail = FlightStatusDetail::UnwrapStatus(status);
+    // This client is unauthenticated and should fail.
+    if (detail == nullptr) {
+      return Status::Invalid("Expected UNAUTHENTICATED but got ", status.ToString());
+    }
+    if (detail->code() != FlightStatusCode::Unauthenticated) {
+      return Status::Invalid("Expected UNAUTHENTICATED but got ", detail->ToString());
+    }
+
+    auto client_handler = std::unique_ptr<ClientAuthHandler>(
+        new TestClientBasicAuthHandler(kAuthUsername, kAuthPassword));
+    RETURN_NOT_OK(client->Authenticate({}, std::move(client_handler)));
+    return CheckActionResults(client.get(), action, {kAuthUsername});
+  }
+};
+
+/// \brief Test middleware that echoes back the value of a particular
+/// incoming header.
+///
+/// In Java, gRPC may consolidate this header with HTTP/2 trailers if
+/// the call fails, but C++ generally doesn't do this. The integration
+/// test confirms the presence of this header to ensure we can read it
+/// regardless of what gRPC does.
+class TestServerMiddleware : public ServerMiddleware {
+ public:
+  explicit TestServerMiddleware(std::string received) : received_(received) {}
+
+  void SendingHeaders(AddCallHeaders* outgoing_headers) override {
+    outgoing_headers->AddHeader("x-middleware", received_);
+  }
+
+  void CallCompleted(const Status& status) override {}
+
+  std::string name() const override { return "GrpcTrailersMiddleware"; }
+
+ private:
+  std::string received_;
+};
+
+class TestServerMiddlewareFactory : public ServerMiddlewareFactory {
+ public:
+  Status StartCall(const CallInfo& info, const CallHeaders& incoming_headers,
+                   std::shared_ptr<ServerMiddleware>* middleware) override {
+    const std::pair<CallHeaders::const_iterator, CallHeaders::const_iterator>& iter_pair =
+        incoming_headers.equal_range("x-middleware");
+    std::string received = "";
+    if (iter_pair.first != iter_pair.second) {
+      const util::string_view& value = (*iter_pair.first).second;
+      received = std::string(value);
+    }
+    *middleware = std::make_shared<TestServerMiddleware>(received);
+    return Status::OK();
+  }
+};
+
+/// \brief Test middleware that adds a header on every outgoing call,
+/// and gets the value of the expected header sent by the server.
+class TestClientMiddleware : public ClientMiddleware {
+ public:
+  explicit TestClientMiddleware(std::string* received_header)
+      : received_header_(received_header) {}
+
+  void SendingHeaders(AddCallHeaders* outgoing_headers) {
+    outgoing_headers->AddHeader("x-middleware", "expected value");
+  }
+
+  void ReceivedHeaders(const CallHeaders& incoming_headers) {
+    // We expect the server to always send this header. gRPC/Java may
+    // send it in trailers instead of headers, so we expect Flight to
+    // account for this.
+    const std::pair<CallHeaders::const_iterator, CallHeaders::const_iterator>& iter_pair =
+        incoming_headers.equal_range("x-middleware");
+    if (iter_pair.first != iter_pair.second) {
+      const util::string_view& value = (*iter_pair.first).second;
+      *received_header_ = std::string(value);
+    }
+  }
+
+  void CallCompleted(const Status& status) {}
+
+ private:
+  std::string* received_header_;
+};
+
+class TestClientMiddlewareFactory : public ClientMiddlewareFactory {
+ public:
+  void StartCall(const CallInfo& info, std::unique_ptr<ClientMiddleware>* middleware) {
+    *middleware =
+        std::unique_ptr<ClientMiddleware>(new TestClientMiddleware(&received_header_));
+  }
+
+  std::string received_header_;
+};
+
+/// \brief The server used for testing middleware. Implements only one
+/// endpoint, GetFlightInfo, in such a way that it either succeeds or
+/// returns an error based on the input, in order to test both paths.
+class MiddlewareServer : public FlightServerBase {
+  Status GetFlightInfo(const ServerCallContext& context,
+                       const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* result) override {
+    if (descriptor.type == FlightDescriptor::DescriptorType::CMD &&
+        descriptor.cmd == "success") {
+      // Don't fail
+      std::shared_ptr<Schema> schema = arrow::schema({});
+      Location location;
+      // Return a fake location - the test doesn't read it
+      RETURN_NOT_OK(Location::ForGrpcTcp("localhost", 10010, &location));
+      std::vector<FlightEndpoint> endpoints{FlightEndpoint{{"foo"}, {location}}};
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            FlightInfo::Make(*schema, descriptor, endpoints, -1, -1));
+      *result = std::unique_ptr<FlightInfo>(new FlightInfo(info));
+      return Status::OK();
+    }
+    // Fail the call immediately. In some gRPC implementations, this
+    // means that gRPC sends only HTTP/2 trailers and not headers. We want
+    // Flight middleware to be agnostic to this difference.
+    return Status::UnknownError("Unknown");
+  }
+};
+
+/// \brief The middleware scenario.
+///
+/// This tests that the server and client get expected header values.
+class MiddlewareScenario : public Scenario {
+  Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+                    FlightServerOptions* options) override {
+    options->middleware.push_back(
+        {"grpc_trailers", std::make_shared<TestServerMiddlewareFactory>()});
+    server->reset(new MiddlewareServer());
+    return Status::OK();
+  }
+
+  Status MakeClient(FlightClientOptions* options) override {
+    client_middleware_ = std::make_shared<TestClientMiddlewareFactory>();
+    options->middleware.push_back(client_middleware_);
+    return Status::OK();
+  }
+
+  Status RunClient(std::unique_ptr<FlightClient> client) override {
+    std::unique_ptr<FlightInfo> info;
+    // This call is expected to fail. In gRPC/Java, this causes the
+    // server to combine headers and HTTP/2 trailers, so to read the
+    // expected header, Flight must check for both headers and
+    // trailers.
+    if (client->GetFlightInfo(FlightDescriptor::Command(""), &info).ok()) {
+      return Status::Invalid("Expected call to fail");
+    }
+    if (client_middleware_->received_header_ != "expected value") {
+      return Status::Invalid(
+          "Expected to receive header 'x-middleware: expected value', but instead got: '",
+          client_middleware_->received_header_, "'");
+    }
+    std::cerr << "Headers received successfully on failing call." << std::endl;
+
+    // This call should succeed
+    client_middleware_->received_header_ = "";
+    RETURN_NOT_OK(client->GetFlightInfo(FlightDescriptor::Command("success"), &info));
+    if (client_middleware_->received_header_ != "expected value") {
+      return Status::Invalid(
+          "Expected to receive header 'x-middleware: expected value', but instead got '",
+          client_middleware_->received_header_, "'");
+    }
+    std::cerr << "Headers received successfully on passing call." << std::endl;
+    return Status::OK();
+  }
+
+  std::shared_ptr<TestClientMiddlewareFactory> client_middleware_;
+};
+
+std::shared_ptr<Schema> GetQuerySchema() {
+  return arrow::schema({arrow::field("id", int64())});
+}
+
+template <typename T>
+arrow::Status AssertEq(const T& expected, const T& actual) {
+  if (expected != actual) {
+    return Status::Invalid("Expected \"", expected, "\", got \'", actual, "\"");
+  }
+  return Status::OK();
+}
+
+/// \brief The server used for testing Flight SQL, this implements a static Flight SQL server which only asserts
+/// that commands called during integration tests are being parsed correctly and returns the expected schemas to be
+/// validated on client.
+class FlightSqlScenarioServer : public sql::FlightSqlServerBase {
+ public:
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoStatement(
+      const ServerCallContext& context, const sql::StatementQuery& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("SELECT STATEMENT", command.query));
+
+    ARROW_ASSIGN_OR_RAISE(auto handle,
+                          sql::CreateStatementQueryTicket("SELECT STATEMENT HANDLE"));
+
+    std::vector<FlightEndpoint> endpoints{FlightEndpoint{{handle}, {}}};
+    ARROW_ASSIGN_OR_RAISE(
+        auto result, FlightInfo::Make(*GetQuerySchema(), descriptor, endpoints, -1, -1))
+
+    return std::unique_ptr<FlightInfo>(new FlightInfo(result));
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetStatement(
+      const ServerCallContext& context,
+      const sql::StatementQueryTicket& command) override {
+    return DoGetForTestCase(GetQuerySchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoPreparedStatement(
+      const ServerCallContext& context, const sql::PreparedStatementQuery& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("SELECT PREPARED STATEMENT HANDLE",
+                                              command.prepared_statement_handle));
+
+    return GetFlightInfoForCommand(descriptor, GetQuerySchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetPreparedStatement(
+      const ServerCallContext& context,
+      const sql::PreparedStatementQuery& command) override {
+    return DoGetForTestCase(GetQuerySchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoCatalogs(
+      const ServerCallContext& context, const FlightDescriptor& descriptor) override {
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetCatalogsSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetCatalogs(
+      const ServerCallContext& context) override {
+    return DoGetForTestCase(sql::SqlSchema::GetCatalogsSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoSqlInfo(
+      const ServerCallContext& context, const sql::GetSqlInfo& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<int>(2, command.info.size()));
+    ARROW_RETURN_NOT_OK(AssertEq<int>(
+        sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_NAME, command.info[0]));
+    ARROW_RETURN_NOT_OK(AssertEq<int>(
+        sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_READ_ONLY, command.info[1]));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetSqlInfoSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetSqlInfo(
+      const ServerCallContext& context, const sql::GetSqlInfo& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetSqlInfoSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoSchemas(
+      const ServerCallContext& context, const sql::GetDbSchemas& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("catalog", command.catalog.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("db_schema_filter_pattern",
+                                              command.db_schema_filter_pattern.value()));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetDbSchemasSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetDbSchemas(
+      const ServerCallContext& context, const sql::GetDbSchemas& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetDbSchemasSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoTables(
+      const ServerCallContext& context, const sql::GetTables& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("catalog", command.catalog.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("db_schema_filter_pattern",
+                                              command.db_schema_filter_pattern.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table_filter_pattern",
+                                              command.table_name_filter_pattern.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<int>(2, command.table_types.size()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_types[0]));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("view", command.table_types[1]));
+    ARROW_RETURN_NOT_OK(AssertEq<bool>(true, command.include_schema));
+
+    return GetFlightInfoForCommand(descriptor,
+                                   sql::SqlSchema::GetTablesSchemaWithIncludedSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetTables(
+      const ServerCallContext& context, const sql::GetTables& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetTablesSchemaWithIncludedSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoTableTypes(
+      const ServerCallContext& context, const FlightDescriptor& descriptor) override {
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetTableTypesSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetTableTypes(
+      const ServerCallContext& context) override {
+    return DoGetForTestCase(sql::SqlSchema::GetTableTypesSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoPrimaryKeys(
+      const ServerCallContext& context, const sql::GetPrimaryKeys& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("db_schema", command.table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetPrimaryKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetPrimaryKeys(
+      const ServerCallContext& context, const sql::GetPrimaryKeys& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetPrimaryKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoExportedKeys(
+      const ServerCallContext& context, const sql::GetExportedKeys& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("db_schema", command.table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetExportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetExportedKeys(
+      const ServerCallContext& context, const sql::GetExportedKeys& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetExportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoImportedKeys(
+      const ServerCallContext& context, const sql::GetImportedKeys& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("db_schema", command.table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetImportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetImportedKeys(
+      const ServerCallContext& context, const sql::GetImportedKeys& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetImportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoCrossReference(
+      const ServerCallContext& context, const sql::GetCrossReference& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("pk_catalog", command.pk_table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("pk_db_schema", command.pk_table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("pk_table", command.pk_table_ref.table));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("fk_catalog", command.fk_table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("fk_db_schema", command.fk_table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("fk_table", command.fk_table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetTableTypesSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetCrossReference(
+      const ServerCallContext& context, const sql::GetCrossReference& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetCrossReferenceSchema());
+  }
+
+  arrow::Result<int64_t> DoPutCommandStatementUpdate(
+      const ServerCallContext& context, const sql::StatementUpdate& command) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("UPDATE STATEMENT", command.query));
+
+    return 10000;
+  }
+
+  arrow::Result<sql::ActionCreatePreparedStatementResult> CreatePreparedStatement(
+      const ServerCallContext& context,
+      const sql::ActionCreatePreparedStatementRequest& request) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<bool>(true, request.query == "SELECT PREPARED STATEMENT" ||
+                                 request.query == "UPDATE PREPARED STATEMENT"));
+
+    sql::ActionCreatePreparedStatementResult result;
+    result.prepared_statement_handle = request.query + " HANDLE";
+
+    return result;
+  }
+
+  Status ClosePreparedStatement(
+      const ServerCallContext& context,
+      const sql::ActionClosePreparedStatementRequest& request) override {
+    return Status::OK();
+  }
+
+  Status DoPutPreparedStatementQuery(const ServerCallContext& context,
+                                     const sql::PreparedStatementQuery& command,
+                                     FlightMessageReader* reader,
+                                     FlightMetadataWriter* writer) override {
+    return Status::NotImplemented("Not implemented");
+  }
+
+  arrow::Result<int64_t> DoPutPreparedStatementUpdate(
+      const ServerCallContext& context, const sql::PreparedStatementUpdate& command,
+      FlightMessageReader* reader) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("UPDATE PREPARED STATEMENT HANDLE",
+                                              command.prepared_statement_handle));
+
+    return 20000;
+  }
+
+ private:
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoForCommand(
+      const FlightDescriptor& descriptor, const std::shared_ptr<Schema>& schema) {
+    std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd}, {}}};
+    ARROW_ASSIGN_OR_RAISE(auto result,
+                          FlightInfo::Make(*schema, descriptor, endpoints, -1, -1))
+
+    return std::unique_ptr<FlightInfo>(new FlightInfo(result));
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetForTestCase(
+      std::shared_ptr<Schema> schema) {
+    ARROW_ASSIGN_OR_RAISE(auto reader2, RecordBatchReader::Make({}, schema));

Review comment:
       Also, why is it `reader2`?

##########
File path: cpp/src/arrow/flight/integration_tests/test_integration.cc
##########
@@ -0,0 +1,665 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/integration_tests/test_integration.h"
+#include "arrow/flight/client_middleware.h"
+#include "arrow/flight/server_middleware.h"
+#include "arrow/flight/sql/client.h"
+#include "arrow/flight/sql/server.h"
+#include "arrow/flight/test_util.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/dictionary.h"
+
+#include <arrow/testing/gtest_util.h>
+#include <arrow/testing/json_integration.h>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+namespace arrow {
+namespace flight {
+namespace integration_tests {
+
+/// \brief The server for the basic auth integration test.
+class AuthBasicProtoServer : public FlightServerBase {
+  Status DoAction(const ServerCallContext& context, const Action& action,
+                  std::unique_ptr<ResultStream>* result) override {
+    // Respond with the authenticated username.
+    auto buf = Buffer::FromString(context.peer_identity());
+    *result = std::unique_ptr<ResultStream>(new SimpleResultStream({Result{buf}}));
+    return Status::OK();
+  }
+};
+
+/// Validate the result of a DoAction.
+Status CheckActionResults(FlightClient* client, const Action& action,
+                          std::vector<std::string> results) {
+  std::unique_ptr<ResultStream> stream;
+  RETURN_NOT_OK(client->DoAction(action, &stream));
+  std::unique_ptr<Result> result;
+  for (const std::string& expected : results) {
+    RETURN_NOT_OK(stream->Next(&result));
+    if (!result) {
+      return Status::Invalid("Action result stream ended early");
+    }
+    const auto actual = result->body->ToString();
+    if (expected != actual) {
+      return Status::Invalid("Got wrong result; expected", expected, "but got", actual);
+    }
+  }
+  RETURN_NOT_OK(stream->Next(&result));
+  if (result) {
+    return Status::Invalid("Action result stream had too many entries");
+  }
+  return Status::OK();
+}
+
+// The expected username for the basic auth integration test.
+constexpr auto kAuthUsername = "arrow";
+// The expected password for the basic auth integration test.
+constexpr auto kAuthPassword = "flight";
+
+/// \brief A scenario testing the basic auth protobuf.
+class AuthBasicProtoScenario : public Scenario {
+  Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+                    FlightServerOptions* options) override {
+    server->reset(new AuthBasicProtoServer());
+    options->auth_handler =
+        std::make_shared<TestServerBasicAuthHandler>(kAuthUsername, kAuthPassword);
+    return Status::OK();
+  }
+
+  Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }
+
+  Status RunClient(std::unique_ptr<FlightClient> client) override {
+    Action action;
+    std::unique_ptr<ResultStream> stream;
+    std::shared_ptr<FlightStatusDetail> detail;
+    const auto& status = client->DoAction(action, &stream);
+    detail = FlightStatusDetail::UnwrapStatus(status);
+    // This client is unauthenticated and should fail.
+    if (detail == nullptr) {
+      return Status::Invalid("Expected UNAUTHENTICATED but got ", status.ToString());
+    }
+    if (detail->code() != FlightStatusCode::Unauthenticated) {
+      return Status::Invalid("Expected UNAUTHENTICATED but got ", detail->ToString());
+    }
+
+    auto client_handler = std::unique_ptr<ClientAuthHandler>(
+        new TestClientBasicAuthHandler(kAuthUsername, kAuthPassword));
+    RETURN_NOT_OK(client->Authenticate({}, std::move(client_handler)));
+    return CheckActionResults(client.get(), action, {kAuthUsername});
+  }
+};
+
+/// \brief Test middleware that echoes back the value of a particular
+/// incoming header.
+///
+/// In Java, gRPC may consolidate this header with HTTP/2 trailers if
+/// the call fails, but C++ generally doesn't do this. The integration
+/// test confirms the presence of this header to ensure we can read it
+/// regardless of what gRPC does.
+class TestServerMiddleware : public ServerMiddleware {
+ public:
+  explicit TestServerMiddleware(std::string received) : received_(received) {}
+
+  void SendingHeaders(AddCallHeaders* outgoing_headers) override {
+    outgoing_headers->AddHeader("x-middleware", received_);
+  }
+
+  void CallCompleted(const Status& status) override {}
+
+  std::string name() const override { return "GrpcTrailersMiddleware"; }
+
+ private:
+  std::string received_;
+};
+
+class TestServerMiddlewareFactory : public ServerMiddlewareFactory {
+ public:
+  Status StartCall(const CallInfo& info, const CallHeaders& incoming_headers,
+                   std::shared_ptr<ServerMiddleware>* middleware) override {
+    const std::pair<CallHeaders::const_iterator, CallHeaders::const_iterator>& iter_pair =
+        incoming_headers.equal_range("x-middleware");
+    std::string received = "";
+    if (iter_pair.first != iter_pair.second) {
+      const util::string_view& value = (*iter_pair.first).second;
+      received = std::string(value);
+    }
+    *middleware = std::make_shared<TestServerMiddleware>(received);
+    return Status::OK();
+  }
+};
+
+/// \brief Test middleware that adds a header on every outgoing call,
+/// and gets the value of the expected header sent by the server.
+class TestClientMiddleware : public ClientMiddleware {
+ public:
+  explicit TestClientMiddleware(std::string* received_header)
+      : received_header_(received_header) {}
+
+  void SendingHeaders(AddCallHeaders* outgoing_headers) {
+    outgoing_headers->AddHeader("x-middleware", "expected value");
+  }
+
+  void ReceivedHeaders(const CallHeaders& incoming_headers) {
+    // We expect the server to always send this header. gRPC/Java may
+    // send it in trailers instead of headers, so we expect Flight to
+    // account for this.
+    const std::pair<CallHeaders::const_iterator, CallHeaders::const_iterator>& iter_pair =
+        incoming_headers.equal_range("x-middleware");
+    if (iter_pair.first != iter_pair.second) {
+      const util::string_view& value = (*iter_pair.first).second;
+      *received_header_ = std::string(value);
+    }
+  }
+
+  void CallCompleted(const Status& status) {}
+
+ private:
+  std::string* received_header_;
+};
+
+class TestClientMiddlewareFactory : public ClientMiddlewareFactory {
+ public:
+  void StartCall(const CallInfo& info, std::unique_ptr<ClientMiddleware>* middleware) {
+    *middleware =
+        std::unique_ptr<ClientMiddleware>(new TestClientMiddleware(&received_header_));
+  }
+
+  std::string received_header_;
+};
+
+/// \brief The server used for testing middleware. Implements only one
+/// endpoint, GetFlightInfo, in such a way that it either succeeds or
+/// returns an error based on the input, in order to test both paths.
+class MiddlewareServer : public FlightServerBase {
+  Status GetFlightInfo(const ServerCallContext& context,
+                       const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* result) override {
+    if (descriptor.type == FlightDescriptor::DescriptorType::CMD &&
+        descriptor.cmd == "success") {
+      // Don't fail
+      std::shared_ptr<Schema> schema = arrow::schema({});
+      Location location;
+      // Return a fake location - the test doesn't read it
+      RETURN_NOT_OK(Location::ForGrpcTcp("localhost", 10010, &location));
+      std::vector<FlightEndpoint> endpoints{FlightEndpoint{{"foo"}, {location}}};
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            FlightInfo::Make(*schema, descriptor, endpoints, -1, -1));
+      *result = std::unique_ptr<FlightInfo>(new FlightInfo(info));
+      return Status::OK();
+    }
+    // Fail the call immediately. In some gRPC implementations, this
+    // means that gRPC sends only HTTP/2 trailers and not headers. We want
+    // Flight middleware to be agnostic to this difference.
+    return Status::UnknownError("Unknown");
+  }
+};
+
+/// \brief The middleware scenario.
+///
+/// This tests that the server and client get expected header values.
+class MiddlewareScenario : public Scenario {
+  Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+                    FlightServerOptions* options) override {
+    options->middleware.push_back(
+        {"grpc_trailers", std::make_shared<TestServerMiddlewareFactory>()});
+    server->reset(new MiddlewareServer());
+    return Status::OK();
+  }
+
+  Status MakeClient(FlightClientOptions* options) override {
+    client_middleware_ = std::make_shared<TestClientMiddlewareFactory>();
+    options->middleware.push_back(client_middleware_);
+    return Status::OK();
+  }
+
+  Status RunClient(std::unique_ptr<FlightClient> client) override {
+    std::unique_ptr<FlightInfo> info;
+    // This call is expected to fail. In gRPC/Java, this causes the
+    // server to combine headers and HTTP/2 trailers, so to read the
+    // expected header, Flight must check for both headers and
+    // trailers.
+    if (client->GetFlightInfo(FlightDescriptor::Command(""), &info).ok()) {
+      return Status::Invalid("Expected call to fail");
+    }
+    if (client_middleware_->received_header_ != "expected value") {
+      return Status::Invalid(
+          "Expected to receive header 'x-middleware: expected value', but instead got: '",
+          client_middleware_->received_header_, "'");
+    }
+    std::cerr << "Headers received successfully on failing call." << std::endl;
+
+    // This call should succeed
+    client_middleware_->received_header_ = "";
+    RETURN_NOT_OK(client->GetFlightInfo(FlightDescriptor::Command("success"), &info));
+    if (client_middleware_->received_header_ != "expected value") {
+      return Status::Invalid(
+          "Expected to receive header 'x-middleware: expected value', but instead got '",
+          client_middleware_->received_header_, "'");
+    }
+    std::cerr << "Headers received successfully on passing call." << std::endl;
+    return Status::OK();
+  }
+
+  std::shared_ptr<TestClientMiddlewareFactory> client_middleware_;
+};
+
+std::shared_ptr<Schema> GetQuerySchema() {
+  return arrow::schema({arrow::field("id", int64())});
+}
+
+template <typename T>
+arrow::Status AssertEq(const T& expected, const T& actual) {
+  if (expected != actual) {
+    return Status::Invalid("Expected \"", expected, "\", got \'", actual, "\"");
+  }
+  return Status::OK();
+}
+
+/// \brief The server used for testing Flight SQL, this implements a static Flight SQL server which only asserts
+/// that commands called during integration tests are being parsed correctly and returns the expected schemas to be
+/// validated on client.
+class FlightSqlScenarioServer : public sql::FlightSqlServerBase {
+ public:
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoStatement(
+      const ServerCallContext& context, const sql::StatementQuery& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("SELECT STATEMENT", command.query));
+
+    ARROW_ASSIGN_OR_RAISE(auto handle,
+                          sql::CreateStatementQueryTicket("SELECT STATEMENT HANDLE"));
+
+    std::vector<FlightEndpoint> endpoints{FlightEndpoint{{handle}, {}}};
+    ARROW_ASSIGN_OR_RAISE(
+        auto result, FlightInfo::Make(*GetQuerySchema(), descriptor, endpoints, -1, -1))
+
+    return std::unique_ptr<FlightInfo>(new FlightInfo(result));
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetStatement(
+      const ServerCallContext& context,
+      const sql::StatementQueryTicket& command) override {
+    return DoGetForTestCase(GetQuerySchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoPreparedStatement(
+      const ServerCallContext& context, const sql::PreparedStatementQuery& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("SELECT PREPARED STATEMENT HANDLE",
+                                              command.prepared_statement_handle));
+
+    return GetFlightInfoForCommand(descriptor, GetQuerySchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetPreparedStatement(
+      const ServerCallContext& context,
+      const sql::PreparedStatementQuery& command) override {
+    return DoGetForTestCase(GetQuerySchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoCatalogs(
+      const ServerCallContext& context, const FlightDescriptor& descriptor) override {
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetCatalogsSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetCatalogs(
+      const ServerCallContext& context) override {
+    return DoGetForTestCase(sql::SqlSchema::GetCatalogsSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoSqlInfo(
+      const ServerCallContext& context, const sql::GetSqlInfo& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<int>(2, command.info.size()));
+    ARROW_RETURN_NOT_OK(AssertEq<int>(
+        sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_NAME, command.info[0]));
+    ARROW_RETURN_NOT_OK(AssertEq<int>(
+        sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_READ_ONLY, command.info[1]));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetSqlInfoSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetSqlInfo(
+      const ServerCallContext& context, const sql::GetSqlInfo& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetSqlInfoSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoSchemas(
+      const ServerCallContext& context, const sql::GetDbSchemas& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("catalog", command.catalog.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("db_schema_filter_pattern",
+                                              command.db_schema_filter_pattern.value()));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetDbSchemasSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetDbSchemas(
+      const ServerCallContext& context, const sql::GetDbSchemas& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetDbSchemasSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoTables(
+      const ServerCallContext& context, const sql::GetTables& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("catalog", command.catalog.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("db_schema_filter_pattern",
+                                              command.db_schema_filter_pattern.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table_filter_pattern",
+                                              command.table_name_filter_pattern.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<int>(2, command.table_types.size()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_types[0]));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("view", command.table_types[1]));
+    ARROW_RETURN_NOT_OK(AssertEq<bool>(true, command.include_schema));
+
+    return GetFlightInfoForCommand(descriptor,
+                                   sql::SqlSchema::GetTablesSchemaWithIncludedSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetTables(
+      const ServerCallContext& context, const sql::GetTables& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetTablesSchemaWithIncludedSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoTableTypes(
+      const ServerCallContext& context, const FlightDescriptor& descriptor) override {
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetTableTypesSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetTableTypes(
+      const ServerCallContext& context) override {
+    return DoGetForTestCase(sql::SqlSchema::GetTableTypesSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoPrimaryKeys(
+      const ServerCallContext& context, const sql::GetPrimaryKeys& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("db_schema", command.table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetPrimaryKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetPrimaryKeys(
+      const ServerCallContext& context, const sql::GetPrimaryKeys& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetPrimaryKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoExportedKeys(
+      const ServerCallContext& context, const sql::GetExportedKeys& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("db_schema", command.table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetExportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetExportedKeys(
+      const ServerCallContext& context, const sql::GetExportedKeys& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetExportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoImportedKeys(
+      const ServerCallContext& context, const sql::GetImportedKeys& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("db_schema", command.table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetImportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetImportedKeys(
+      const ServerCallContext& context, const sql::GetImportedKeys& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetImportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoCrossReference(
+      const ServerCallContext& context, const sql::GetCrossReference& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("pk_catalog", command.pk_table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("pk_db_schema", command.pk_table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("pk_table", command.pk_table_ref.table));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("fk_catalog", command.fk_table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("fk_db_schema", command.fk_table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("fk_table", command.fk_table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetTableTypesSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetCrossReference(
+      const ServerCallContext& context, const sql::GetCrossReference& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetCrossReferenceSchema());
+  }
+
+  arrow::Result<int64_t> DoPutCommandStatementUpdate(
+      const ServerCallContext& context, const sql::StatementUpdate& command) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("UPDATE STATEMENT", command.query));
+
+    return 10000;
+  }
+
+  arrow::Result<sql::ActionCreatePreparedStatementResult> CreatePreparedStatement(
+      const ServerCallContext& context,
+      const sql::ActionCreatePreparedStatementRequest& request) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<bool>(true, request.query == "SELECT PREPARED STATEMENT" ||
+                                 request.query == "UPDATE PREPARED STATEMENT"));
+
+    sql::ActionCreatePreparedStatementResult result;
+    result.prepared_statement_handle = request.query + " HANDLE";
+
+    return result;
+  }
+
+  Status ClosePreparedStatement(
+      const ServerCallContext& context,
+      const sql::ActionClosePreparedStatementRequest& request) override {
+    return Status::OK();
+  }
+
+  Status DoPutPreparedStatementQuery(const ServerCallContext& context,
+                                     const sql::PreparedStatementQuery& command,
+                                     FlightMessageReader* reader,
+                                     FlightMetadataWriter* writer) override {
+    return Status::NotImplemented("Not implemented");
+  }
+
+  arrow::Result<int64_t> DoPutPreparedStatementUpdate(
+      const ServerCallContext& context, const sql::PreparedStatementUpdate& command,
+      FlightMessageReader* reader) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("UPDATE PREPARED STATEMENT HANDLE",
+                                              command.prepared_statement_handle));
+
+    return 20000;
+  }
+
+ private:
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoForCommand(
+      const FlightDescriptor& descriptor, const std::shared_ptr<Schema>& schema) {
+    std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd}, {}}};
+    ARROW_ASSIGN_OR_RAISE(auto result,
+                          FlightInfo::Make(*schema, descriptor, endpoints, -1, -1))
+
+    return std::unique_ptr<FlightInfo>(new FlightInfo(result));
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetForTestCase(
+      std::shared_ptr<Schema> schema) {
+    ARROW_ASSIGN_OR_RAISE(auto reader2, RecordBatchReader::Make({}, schema));

Review comment:
       nit: `std::move(schema)`? (or declare the parameter as `const&`)

##########
File path: cpp/src/arrow/flight/integration_tests/test_integration.cc
##########
@@ -0,0 +1,665 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/integration_tests/test_integration.h"
+#include "arrow/flight/client_middleware.h"
+#include "arrow/flight/server_middleware.h"
+#include "arrow/flight/sql/client.h"
+#include "arrow/flight/sql/server.h"
+#include "arrow/flight/test_util.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/dictionary.h"
+
+#include <arrow/testing/gtest_util.h>
+#include <arrow/testing/json_integration.h>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+namespace arrow {
+namespace flight {
+namespace integration_tests {
+
+/// \brief The server for the basic auth integration test.
+class AuthBasicProtoServer : public FlightServerBase {
+  Status DoAction(const ServerCallContext& context, const Action& action,
+                  std::unique_ptr<ResultStream>* result) override {
+    // Respond with the authenticated username.
+    auto buf = Buffer::FromString(context.peer_identity());
+    *result = std::unique_ptr<ResultStream>(new SimpleResultStream({Result{buf}}));
+    return Status::OK();
+  }
+};
+
+/// Validate the result of a DoAction.
+Status CheckActionResults(FlightClient* client, const Action& action,
+                          std::vector<std::string> results) {
+  std::unique_ptr<ResultStream> stream;
+  RETURN_NOT_OK(client->DoAction(action, &stream));
+  std::unique_ptr<Result> result;
+  for (const std::string& expected : results) {
+    RETURN_NOT_OK(stream->Next(&result));
+    if (!result) {
+      return Status::Invalid("Action result stream ended early");
+    }
+    const auto actual = result->body->ToString();
+    if (expected != actual) {
+      return Status::Invalid("Got wrong result; expected", expected, "but got", actual);
+    }
+  }
+  RETURN_NOT_OK(stream->Next(&result));
+  if (result) {
+    return Status::Invalid("Action result stream had too many entries");
+  }
+  return Status::OK();
+}
+
+// The expected username for the basic auth integration test.
+constexpr auto kAuthUsername = "arrow";
+// The expected password for the basic auth integration test.
+constexpr auto kAuthPassword = "flight";
+
+/// \brief A scenario testing the basic auth protobuf.
+class AuthBasicProtoScenario : public Scenario {
+  Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+                    FlightServerOptions* options) override {
+    server->reset(new AuthBasicProtoServer());
+    options->auth_handler =
+        std::make_shared<TestServerBasicAuthHandler>(kAuthUsername, kAuthPassword);
+    return Status::OK();
+  }
+
+  Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }
+
+  Status RunClient(std::unique_ptr<FlightClient> client) override {
+    Action action;
+    std::unique_ptr<ResultStream> stream;
+    std::shared_ptr<FlightStatusDetail> detail;
+    const auto& status = client->DoAction(action, &stream);
+    detail = FlightStatusDetail::UnwrapStatus(status);
+    // This client is unauthenticated and should fail.
+    if (detail == nullptr) {
+      return Status::Invalid("Expected UNAUTHENTICATED but got ", status.ToString());
+    }
+    if (detail->code() != FlightStatusCode::Unauthenticated) {
+      return Status::Invalid("Expected UNAUTHENTICATED but got ", detail->ToString());
+    }
+
+    auto client_handler = std::unique_ptr<ClientAuthHandler>(
+        new TestClientBasicAuthHandler(kAuthUsername, kAuthPassword));
+    RETURN_NOT_OK(client->Authenticate({}, std::move(client_handler)));
+    return CheckActionResults(client.get(), action, {kAuthUsername});
+  }
+};
+
+/// \brief Test middleware that echoes back the value of a particular
+/// incoming header.
+///
+/// In Java, gRPC may consolidate this header with HTTP/2 trailers if
+/// the call fails, but C++ generally doesn't do this. The integration
+/// test confirms the presence of this header to ensure we can read it
+/// regardless of what gRPC does.
+class TestServerMiddleware : public ServerMiddleware {
+ public:
+  explicit TestServerMiddleware(std::string received) : received_(received) {}
+
+  void SendingHeaders(AddCallHeaders* outgoing_headers) override {
+    outgoing_headers->AddHeader("x-middleware", received_);
+  }
+
+  void CallCompleted(const Status& status) override {}
+
+  std::string name() const override { return "GrpcTrailersMiddleware"; }
+
+ private:
+  std::string received_;
+};
+
+class TestServerMiddlewareFactory : public ServerMiddlewareFactory {
+ public:
+  Status StartCall(const CallInfo& info, const CallHeaders& incoming_headers,
+                   std::shared_ptr<ServerMiddleware>* middleware) override {
+    const std::pair<CallHeaders::const_iterator, CallHeaders::const_iterator>& iter_pair =
+        incoming_headers.equal_range("x-middleware");
+    std::string received = "";
+    if (iter_pair.first != iter_pair.second) {
+      const util::string_view& value = (*iter_pair.first).second;
+      received = std::string(value);
+    }
+    *middleware = std::make_shared<TestServerMiddleware>(received);
+    return Status::OK();
+  }
+};
+
+/// \brief Test middleware that adds a header on every outgoing call,
+/// and gets the value of the expected header sent by the server.
+class TestClientMiddleware : public ClientMiddleware {
+ public:
+  explicit TestClientMiddleware(std::string* received_header)
+      : received_header_(received_header) {}
+
+  void SendingHeaders(AddCallHeaders* outgoing_headers) {
+    outgoing_headers->AddHeader("x-middleware", "expected value");
+  }
+
+  void ReceivedHeaders(const CallHeaders& incoming_headers) {
+    // We expect the server to always send this header. gRPC/Java may
+    // send it in trailers instead of headers, so we expect Flight to
+    // account for this.
+    const std::pair<CallHeaders::const_iterator, CallHeaders::const_iterator>& iter_pair =
+        incoming_headers.equal_range("x-middleware");
+    if (iter_pair.first != iter_pair.second) {
+      const util::string_view& value = (*iter_pair.first).second;
+      *received_header_ = std::string(value);
+    }
+  }
+
+  void CallCompleted(const Status& status) {}
+
+ private:
+  std::string* received_header_;
+};
+
+class TestClientMiddlewareFactory : public ClientMiddlewareFactory {
+ public:
+  void StartCall(const CallInfo& info, std::unique_ptr<ClientMiddleware>* middleware) {
+    *middleware =
+        std::unique_ptr<ClientMiddleware>(new TestClientMiddleware(&received_header_));
+  }
+
+  std::string received_header_;
+};
+
+/// \brief The server used for testing middleware. Implements only one
+/// endpoint, GetFlightInfo, in such a way that it either succeeds or
+/// returns an error based on the input, in order to test both paths.
+class MiddlewareServer : public FlightServerBase {
+  Status GetFlightInfo(const ServerCallContext& context,
+                       const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* result) override {
+    if (descriptor.type == FlightDescriptor::DescriptorType::CMD &&
+        descriptor.cmd == "success") {
+      // Don't fail
+      std::shared_ptr<Schema> schema = arrow::schema({});
+      Location location;
+      // Return a fake location - the test doesn't read it
+      RETURN_NOT_OK(Location::ForGrpcTcp("localhost", 10010, &location));
+      std::vector<FlightEndpoint> endpoints{FlightEndpoint{{"foo"}, {location}}};
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            FlightInfo::Make(*schema, descriptor, endpoints, -1, -1));
+      *result = std::unique_ptr<FlightInfo>(new FlightInfo(info));
+      return Status::OK();
+    }
+    // Fail the call immediately. In some gRPC implementations, this
+    // means that gRPC sends only HTTP/2 trailers and not headers. We want
+    // Flight middleware to be agnostic to this difference.
+    return Status::UnknownError("Unknown");
+  }
+};
+
+/// \brief The middleware scenario.
+///
+/// This tests that the server and client get expected header values.
+class MiddlewareScenario : public Scenario {
+  Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+                    FlightServerOptions* options) override {
+    options->middleware.push_back(
+        {"grpc_trailers", std::make_shared<TestServerMiddlewareFactory>()});
+    server->reset(new MiddlewareServer());
+    return Status::OK();
+  }
+
+  Status MakeClient(FlightClientOptions* options) override {
+    client_middleware_ = std::make_shared<TestClientMiddlewareFactory>();
+    options->middleware.push_back(client_middleware_);
+    return Status::OK();
+  }
+
+  Status RunClient(std::unique_ptr<FlightClient> client) override {
+    std::unique_ptr<FlightInfo> info;
+    // This call is expected to fail. In gRPC/Java, this causes the
+    // server to combine headers and HTTP/2 trailers, so to read the
+    // expected header, Flight must check for both headers and
+    // trailers.
+    if (client->GetFlightInfo(FlightDescriptor::Command(""), &info).ok()) {
+      return Status::Invalid("Expected call to fail");
+    }
+    if (client_middleware_->received_header_ != "expected value") {
+      return Status::Invalid(
+          "Expected to receive header 'x-middleware: expected value', but instead got: '",
+          client_middleware_->received_header_, "'");
+    }
+    std::cerr << "Headers received successfully on failing call." << std::endl;
+
+    // This call should succeed
+    client_middleware_->received_header_ = "";
+    RETURN_NOT_OK(client->GetFlightInfo(FlightDescriptor::Command("success"), &info));
+    if (client_middleware_->received_header_ != "expected value") {
+      return Status::Invalid(
+          "Expected to receive header 'x-middleware: expected value', but instead got '",
+          client_middleware_->received_header_, "'");
+    }
+    std::cerr << "Headers received successfully on passing call." << std::endl;
+    return Status::OK();
+  }
+
+  std::shared_ptr<TestClientMiddlewareFactory> client_middleware_;
+};
+
+std::shared_ptr<Schema> GetQuerySchema() {
+  return arrow::schema({arrow::field("id", int64())});
+}
+
+template <typename T>
+arrow::Status AssertEq(const T& expected, const T& actual) {
+  if (expected != actual) {
+    return Status::Invalid("Expected \"", expected, "\", got \'", actual, "\"");
+  }
+  return Status::OK();
+}
+
+/// \brief The server used for testing Flight SQL, this implements a static Flight SQL server which only asserts
+/// that commands called during integration tests are being parsed correctly and returns the expected schemas to be
+/// validated on client.
+class FlightSqlScenarioServer : public sql::FlightSqlServerBase {
+ public:
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoStatement(
+      const ServerCallContext& context, const sql::StatementQuery& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("SELECT STATEMENT", command.query));
+
+    ARROW_ASSIGN_OR_RAISE(auto handle,
+                          sql::CreateStatementQueryTicket("SELECT STATEMENT HANDLE"));
+
+    std::vector<FlightEndpoint> endpoints{FlightEndpoint{{handle}, {}}};
+    ARROW_ASSIGN_OR_RAISE(
+        auto result, FlightInfo::Make(*GetQuerySchema(), descriptor, endpoints, -1, -1))
+
+    return std::unique_ptr<FlightInfo>(new FlightInfo(result));
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetStatement(
+      const ServerCallContext& context,
+      const sql::StatementQueryTicket& command) override {
+    return DoGetForTestCase(GetQuerySchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoPreparedStatement(
+      const ServerCallContext& context, const sql::PreparedStatementQuery& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("SELECT PREPARED STATEMENT HANDLE",
+                                              command.prepared_statement_handle));
+
+    return GetFlightInfoForCommand(descriptor, GetQuerySchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetPreparedStatement(
+      const ServerCallContext& context,
+      const sql::PreparedStatementQuery& command) override {
+    return DoGetForTestCase(GetQuerySchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoCatalogs(
+      const ServerCallContext& context, const FlightDescriptor& descriptor) override {
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetCatalogsSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetCatalogs(
+      const ServerCallContext& context) override {
+    return DoGetForTestCase(sql::SqlSchema::GetCatalogsSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoSqlInfo(
+      const ServerCallContext& context, const sql::GetSqlInfo& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<int>(2, command.info.size()));
+    ARROW_RETURN_NOT_OK(AssertEq<int>(
+        sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_NAME, command.info[0]));
+    ARROW_RETURN_NOT_OK(AssertEq<int>(
+        sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_READ_ONLY, command.info[1]));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetSqlInfoSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetSqlInfo(
+      const ServerCallContext& context, const sql::GetSqlInfo& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetSqlInfoSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoSchemas(
+      const ServerCallContext& context, const sql::GetDbSchemas& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("catalog", command.catalog.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("db_schema_filter_pattern",
+                                              command.db_schema_filter_pattern.value()));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetDbSchemasSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetDbSchemas(
+      const ServerCallContext& context, const sql::GetDbSchemas& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetDbSchemasSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoTables(
+      const ServerCallContext& context, const sql::GetTables& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("catalog", command.catalog.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("db_schema_filter_pattern",
+                                              command.db_schema_filter_pattern.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table_filter_pattern",
+                                              command.table_name_filter_pattern.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<int>(2, command.table_types.size()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_types[0]));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("view", command.table_types[1]));
+    ARROW_RETURN_NOT_OK(AssertEq<bool>(true, command.include_schema));
+
+    return GetFlightInfoForCommand(descriptor,
+                                   sql::SqlSchema::GetTablesSchemaWithIncludedSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetTables(
+      const ServerCallContext& context, const sql::GetTables& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetTablesSchemaWithIncludedSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoTableTypes(
+      const ServerCallContext& context, const FlightDescriptor& descriptor) override {
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetTableTypesSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetTableTypes(
+      const ServerCallContext& context) override {
+    return DoGetForTestCase(sql::SqlSchema::GetTableTypesSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoPrimaryKeys(
+      const ServerCallContext& context, const sql::GetPrimaryKeys& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("db_schema", command.table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetPrimaryKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetPrimaryKeys(
+      const ServerCallContext& context, const sql::GetPrimaryKeys& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetPrimaryKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoExportedKeys(
+      const ServerCallContext& context, const sql::GetExportedKeys& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("db_schema", command.table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetExportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetExportedKeys(
+      const ServerCallContext& context, const sql::GetExportedKeys& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetExportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoImportedKeys(
+      const ServerCallContext& context, const sql::GetImportedKeys& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("db_schema", command.table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetImportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetImportedKeys(
+      const ServerCallContext& context, const sql::GetImportedKeys& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetImportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoCrossReference(
+      const ServerCallContext& context, const sql::GetCrossReference& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("pk_catalog", command.pk_table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("pk_db_schema", command.pk_table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("pk_table", command.pk_table_ref.table));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("fk_catalog", command.fk_table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("fk_db_schema", command.fk_table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("fk_table", command.fk_table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetTableTypesSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetCrossReference(
+      const ServerCallContext& context, const sql::GetCrossReference& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetCrossReferenceSchema());
+  }
+
+  arrow::Result<int64_t> DoPutCommandStatementUpdate(
+      const ServerCallContext& context, const sql::StatementUpdate& command) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("UPDATE STATEMENT", command.query));
+
+    return 10000;
+  }
+
+  arrow::Result<sql::ActionCreatePreparedStatementResult> CreatePreparedStatement(
+      const ServerCallContext& context,
+      const sql::ActionCreatePreparedStatementRequest& request) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<bool>(true, request.query == "SELECT PREPARED STATEMENT" ||
+                                 request.query == "UPDATE PREPARED STATEMENT"));
+
+    sql::ActionCreatePreparedStatementResult result;
+    result.prepared_statement_handle = request.query + " HANDLE";
+
+    return result;
+  }
+
+  Status ClosePreparedStatement(
+      const ServerCallContext& context,
+      const sql::ActionClosePreparedStatementRequest& request) override {
+    return Status::OK();
+  }
+
+  Status DoPutPreparedStatementQuery(const ServerCallContext& context,
+                                     const sql::PreparedStatementQuery& command,
+                                     FlightMessageReader* reader,
+                                     FlightMetadataWriter* writer) override {
+    return Status::NotImplemented("Not implemented");

Review comment:
       Shall we test this flow as well?

##########
File path: cpp/src/arrow/flight/integration_tests/test_integration.cc
##########
@@ -0,0 +1,665 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/integration_tests/test_integration.h"
+#include "arrow/flight/client_middleware.h"
+#include "arrow/flight/server_middleware.h"
+#include "arrow/flight/sql/client.h"
+#include "arrow/flight/sql/server.h"
+#include "arrow/flight/test_util.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/dictionary.h"
+
+#include <arrow/testing/gtest_util.h>
+#include <arrow/testing/json_integration.h>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+namespace arrow {
+namespace flight {
+namespace integration_tests {
+
+/// \brief The server for the basic auth integration test.
+class AuthBasicProtoServer : public FlightServerBase {
+  Status DoAction(const ServerCallContext& context, const Action& action,
+                  std::unique_ptr<ResultStream>* result) override {
+    // Respond with the authenticated username.
+    auto buf = Buffer::FromString(context.peer_identity());
+    *result = std::unique_ptr<ResultStream>(new SimpleResultStream({Result{buf}}));
+    return Status::OK();
+  }
+};
+
+/// Validate the result of a DoAction.
+Status CheckActionResults(FlightClient* client, const Action& action,
+                          std::vector<std::string> results) {
+  std::unique_ptr<ResultStream> stream;
+  RETURN_NOT_OK(client->DoAction(action, &stream));
+  std::unique_ptr<Result> result;
+  for (const std::string& expected : results) {
+    RETURN_NOT_OK(stream->Next(&result));
+    if (!result) {
+      return Status::Invalid("Action result stream ended early");
+    }
+    const auto actual = result->body->ToString();
+    if (expected != actual) {
+      return Status::Invalid("Got wrong result; expected", expected, "but got", actual);
+    }
+  }
+  RETURN_NOT_OK(stream->Next(&result));
+  if (result) {
+    return Status::Invalid("Action result stream had too many entries");
+  }
+  return Status::OK();
+}
+
+// The expected username for the basic auth integration test.
+constexpr auto kAuthUsername = "arrow";
+// The expected password for the basic auth integration test.
+constexpr auto kAuthPassword = "flight";
+
+/// \brief A scenario testing the basic auth protobuf.
+class AuthBasicProtoScenario : public Scenario {
+  Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+                    FlightServerOptions* options) override {
+    server->reset(new AuthBasicProtoServer());
+    options->auth_handler =
+        std::make_shared<TestServerBasicAuthHandler>(kAuthUsername, kAuthPassword);
+    return Status::OK();
+  }
+
+  Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }
+
+  Status RunClient(std::unique_ptr<FlightClient> client) override {
+    Action action;
+    std::unique_ptr<ResultStream> stream;
+    std::shared_ptr<FlightStatusDetail> detail;
+    const auto& status = client->DoAction(action, &stream);
+    detail = FlightStatusDetail::UnwrapStatus(status);
+    // This client is unauthenticated and should fail.
+    if (detail == nullptr) {
+      return Status::Invalid("Expected UNAUTHENTICATED but got ", status.ToString());
+    }
+    if (detail->code() != FlightStatusCode::Unauthenticated) {
+      return Status::Invalid("Expected UNAUTHENTICATED but got ", detail->ToString());
+    }
+
+    auto client_handler = std::unique_ptr<ClientAuthHandler>(
+        new TestClientBasicAuthHandler(kAuthUsername, kAuthPassword));
+    RETURN_NOT_OK(client->Authenticate({}, std::move(client_handler)));
+    return CheckActionResults(client.get(), action, {kAuthUsername});
+  }
+};
+
+/// \brief Test middleware that echoes back the value of a particular
+/// incoming header.
+///
+/// In Java, gRPC may consolidate this header with HTTP/2 trailers if
+/// the call fails, but C++ generally doesn't do this. The integration
+/// test confirms the presence of this header to ensure we can read it
+/// regardless of what gRPC does.
+class TestServerMiddleware : public ServerMiddleware {
+ public:
+  explicit TestServerMiddleware(std::string received) : received_(received) {}
+
+  void SendingHeaders(AddCallHeaders* outgoing_headers) override {
+    outgoing_headers->AddHeader("x-middleware", received_);
+  }
+
+  void CallCompleted(const Status& status) override {}
+
+  std::string name() const override { return "GrpcTrailersMiddleware"; }
+
+ private:
+  std::string received_;
+};
+
+class TestServerMiddlewareFactory : public ServerMiddlewareFactory {
+ public:
+  Status StartCall(const CallInfo& info, const CallHeaders& incoming_headers,
+                   std::shared_ptr<ServerMiddleware>* middleware) override {
+    const std::pair<CallHeaders::const_iterator, CallHeaders::const_iterator>& iter_pair =
+        incoming_headers.equal_range("x-middleware");
+    std::string received = "";
+    if (iter_pair.first != iter_pair.second) {
+      const util::string_view& value = (*iter_pair.first).second;
+      received = std::string(value);
+    }
+    *middleware = std::make_shared<TestServerMiddleware>(received);
+    return Status::OK();
+  }
+};
+
+/// \brief Test middleware that adds a header on every outgoing call,
+/// and gets the value of the expected header sent by the server.
+class TestClientMiddleware : public ClientMiddleware {
+ public:
+  explicit TestClientMiddleware(std::string* received_header)
+      : received_header_(received_header) {}
+
+  void SendingHeaders(AddCallHeaders* outgoing_headers) {
+    outgoing_headers->AddHeader("x-middleware", "expected value");
+  }
+
+  void ReceivedHeaders(const CallHeaders& incoming_headers) {
+    // We expect the server to always send this header. gRPC/Java may
+    // send it in trailers instead of headers, so we expect Flight to
+    // account for this.
+    const std::pair<CallHeaders::const_iterator, CallHeaders::const_iterator>& iter_pair =
+        incoming_headers.equal_range("x-middleware");
+    if (iter_pair.first != iter_pair.second) {
+      const util::string_view& value = (*iter_pair.first).second;
+      *received_header_ = std::string(value);
+    }
+  }
+
+  void CallCompleted(const Status& status) {}
+
+ private:
+  std::string* received_header_;
+};
+
+class TestClientMiddlewareFactory : public ClientMiddlewareFactory {
+ public:
+  void StartCall(const CallInfo& info, std::unique_ptr<ClientMiddleware>* middleware) {
+    *middleware =
+        std::unique_ptr<ClientMiddleware>(new TestClientMiddleware(&received_header_));
+  }
+
+  std::string received_header_;
+};
+
+/// \brief The server used for testing middleware. Implements only one
+/// endpoint, GetFlightInfo, in such a way that it either succeeds or
+/// returns an error based on the input, in order to test both paths.
+class MiddlewareServer : public FlightServerBase {
+  Status GetFlightInfo(const ServerCallContext& context,
+                       const FlightDescriptor& descriptor,
+                       std::unique_ptr<FlightInfo>* result) override {
+    if (descriptor.type == FlightDescriptor::DescriptorType::CMD &&
+        descriptor.cmd == "success") {
+      // Don't fail
+      std::shared_ptr<Schema> schema = arrow::schema({});
+      Location location;
+      // Return a fake location - the test doesn't read it
+      RETURN_NOT_OK(Location::ForGrpcTcp("localhost", 10010, &location));
+      std::vector<FlightEndpoint> endpoints{FlightEndpoint{{"foo"}, {location}}};
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            FlightInfo::Make(*schema, descriptor, endpoints, -1, -1));
+      *result = std::unique_ptr<FlightInfo>(new FlightInfo(info));
+      return Status::OK();
+    }
+    // Fail the call immediately. In some gRPC implementations, this
+    // means that gRPC sends only HTTP/2 trailers and not headers. We want
+    // Flight middleware to be agnostic to this difference.
+    return Status::UnknownError("Unknown");
+  }
+};
+
+/// \brief The middleware scenario.
+///
+/// This tests that the server and client get expected header values.
+class MiddlewareScenario : public Scenario {
+  Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+                    FlightServerOptions* options) override {
+    options->middleware.push_back(
+        {"grpc_trailers", std::make_shared<TestServerMiddlewareFactory>()});
+    server->reset(new MiddlewareServer());
+    return Status::OK();
+  }
+
+  Status MakeClient(FlightClientOptions* options) override {
+    client_middleware_ = std::make_shared<TestClientMiddlewareFactory>();
+    options->middleware.push_back(client_middleware_);
+    return Status::OK();
+  }
+
+  Status RunClient(std::unique_ptr<FlightClient> client) override {
+    std::unique_ptr<FlightInfo> info;
+    // This call is expected to fail. In gRPC/Java, this causes the
+    // server to combine headers and HTTP/2 trailers, so to read the
+    // expected header, Flight must check for both headers and
+    // trailers.
+    if (client->GetFlightInfo(FlightDescriptor::Command(""), &info).ok()) {
+      return Status::Invalid("Expected call to fail");
+    }
+    if (client_middleware_->received_header_ != "expected value") {
+      return Status::Invalid(
+          "Expected to receive header 'x-middleware: expected value', but instead got: '",
+          client_middleware_->received_header_, "'");
+    }
+    std::cerr << "Headers received successfully on failing call." << std::endl;
+
+    // This call should succeed
+    client_middleware_->received_header_ = "";
+    RETURN_NOT_OK(client->GetFlightInfo(FlightDescriptor::Command("success"), &info));
+    if (client_middleware_->received_header_ != "expected value") {
+      return Status::Invalid(
+          "Expected to receive header 'x-middleware: expected value', but instead got '",
+          client_middleware_->received_header_, "'");
+    }
+    std::cerr << "Headers received successfully on passing call." << std::endl;
+    return Status::OK();
+  }
+
+  std::shared_ptr<TestClientMiddlewareFactory> client_middleware_;
+};
+
+std::shared_ptr<Schema> GetQuerySchema() {
+  return arrow::schema({arrow::field("id", int64())});
+}
+
+template <typename T>
+arrow::Status AssertEq(const T& expected, const T& actual) {
+  if (expected != actual) {
+    return Status::Invalid("Expected \"", expected, "\", got \'", actual, "\"");
+  }
+  return Status::OK();
+}
+
+/// \brief The server used for testing Flight SQL, this implements a static Flight SQL server which only asserts
+/// that commands called during integration tests are being parsed correctly and returns the expected schemas to be
+/// validated on client.
+class FlightSqlScenarioServer : public sql::FlightSqlServerBase {
+ public:
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoStatement(
+      const ServerCallContext& context, const sql::StatementQuery& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("SELECT STATEMENT", command.query));
+
+    ARROW_ASSIGN_OR_RAISE(auto handle,
+                          sql::CreateStatementQueryTicket("SELECT STATEMENT HANDLE"));
+
+    std::vector<FlightEndpoint> endpoints{FlightEndpoint{{handle}, {}}};
+    ARROW_ASSIGN_OR_RAISE(
+        auto result, FlightInfo::Make(*GetQuerySchema(), descriptor, endpoints, -1, -1))
+
+    return std::unique_ptr<FlightInfo>(new FlightInfo(result));
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetStatement(
+      const ServerCallContext& context,
+      const sql::StatementQueryTicket& command) override {
+    return DoGetForTestCase(GetQuerySchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoPreparedStatement(
+      const ServerCallContext& context, const sql::PreparedStatementQuery& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("SELECT PREPARED STATEMENT HANDLE",
+                                              command.prepared_statement_handle));
+
+    return GetFlightInfoForCommand(descriptor, GetQuerySchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetPreparedStatement(
+      const ServerCallContext& context,
+      const sql::PreparedStatementQuery& command) override {
+    return DoGetForTestCase(GetQuerySchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoCatalogs(
+      const ServerCallContext& context, const FlightDescriptor& descriptor) override {
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetCatalogsSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetCatalogs(
+      const ServerCallContext& context) override {
+    return DoGetForTestCase(sql::SqlSchema::GetCatalogsSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoSqlInfo(
+      const ServerCallContext& context, const sql::GetSqlInfo& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<int>(2, command.info.size()));
+    ARROW_RETURN_NOT_OK(AssertEq<int>(
+        sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_NAME, command.info[0]));
+    ARROW_RETURN_NOT_OK(AssertEq<int>(
+        sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_READ_ONLY, command.info[1]));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetSqlInfoSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetSqlInfo(
+      const ServerCallContext& context, const sql::GetSqlInfo& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetSqlInfoSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoSchemas(
+      const ServerCallContext& context, const sql::GetDbSchemas& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("catalog", command.catalog.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("db_schema_filter_pattern",
+                                              command.db_schema_filter_pattern.value()));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetDbSchemasSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetDbSchemas(
+      const ServerCallContext& context, const sql::GetDbSchemas& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetDbSchemasSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoTables(
+      const ServerCallContext& context, const sql::GetTables& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("catalog", command.catalog.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("db_schema_filter_pattern",
+                                              command.db_schema_filter_pattern.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table_filter_pattern",
+                                              command.table_name_filter_pattern.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<int>(2, command.table_types.size()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_types[0]));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("view", command.table_types[1]));
+    ARROW_RETURN_NOT_OK(AssertEq<bool>(true, command.include_schema));
+
+    return GetFlightInfoForCommand(descriptor,
+                                   sql::SqlSchema::GetTablesSchemaWithIncludedSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetTables(
+      const ServerCallContext& context, const sql::GetTables& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetTablesSchemaWithIncludedSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoTableTypes(
+      const ServerCallContext& context, const FlightDescriptor& descriptor) override {
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetTableTypesSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetTableTypes(
+      const ServerCallContext& context) override {
+    return DoGetForTestCase(sql::SqlSchema::GetTableTypesSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoPrimaryKeys(
+      const ServerCallContext& context, const sql::GetPrimaryKeys& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("db_schema", command.table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetPrimaryKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetPrimaryKeys(
+      const ServerCallContext& context, const sql::GetPrimaryKeys& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetPrimaryKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoExportedKeys(
+      const ServerCallContext& context, const sql::GetExportedKeys& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("db_schema", command.table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetExportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetExportedKeys(
+      const ServerCallContext& context, const sql::GetExportedKeys& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetExportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoImportedKeys(
+      const ServerCallContext& context, const sql::GetImportedKeys& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("db_schema", command.table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetImportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetImportedKeys(
+      const ServerCallContext& context, const sql::GetImportedKeys& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetImportedKeysSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoCrossReference(
+      const ServerCallContext& context, const sql::GetCrossReference& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("pk_catalog", command.pk_table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("pk_db_schema", command.pk_table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("pk_table", command.pk_table_ref.table));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("fk_catalog", command.fk_table_ref.catalog.value()));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("fk_db_schema", command.fk_table_ref.db_schema.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("fk_table", command.fk_table_ref.table));
+
+    return GetFlightInfoForCommand(descriptor, sql::SqlSchema::GetTableTypesSchema());
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetCrossReference(
+      const ServerCallContext& context, const sql::GetCrossReference& command) override {
+    return DoGetForTestCase(sql::SqlSchema::GetCrossReferenceSchema());
+  }
+
+  arrow::Result<int64_t> DoPutCommandStatementUpdate(
+      const ServerCallContext& context, const sql::StatementUpdate& command) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("UPDATE STATEMENT", command.query));
+
+    return 10000;
+  }
+
+  arrow::Result<sql::ActionCreatePreparedStatementResult> CreatePreparedStatement(
+      const ServerCallContext& context,
+      const sql::ActionCreatePreparedStatementRequest& request) override {
+    ARROW_RETURN_NOT_OK(
+        AssertEq<bool>(true, request.query == "SELECT PREPARED STATEMENT" ||
+                                 request.query == "UPDATE PREPARED STATEMENT"));
+
+    sql::ActionCreatePreparedStatementResult result;
+    result.prepared_statement_handle = request.query + " HANDLE";
+
+    return result;
+  }
+
+  Status ClosePreparedStatement(
+      const ServerCallContext& context,
+      const sql::ActionClosePreparedStatementRequest& request) override {
+    return Status::OK();
+  }
+
+  Status DoPutPreparedStatementQuery(const ServerCallContext& context,
+                                     const sql::PreparedStatementQuery& command,
+                                     FlightMessageReader* reader,
+                                     FlightMetadataWriter* writer) override {
+    return Status::NotImplemented("Not implemented");
+  }
+
+  arrow::Result<int64_t> DoPutPreparedStatementUpdate(
+      const ServerCallContext& context, const sql::PreparedStatementUpdate& command,
+      FlightMessageReader* reader) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("UPDATE PREPARED STATEMENT HANDLE",
+                                              command.prepared_statement_handle));
+
+    return 20000;
+  }
+
+ private:
+  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoForCommand(
+      const FlightDescriptor& descriptor, const std::shared_ptr<Schema>& schema) {
+    std::vector<FlightEndpoint> endpoints{FlightEndpoint{{descriptor.cmd}, {}}};
+    ARROW_ASSIGN_OR_RAISE(auto result,
+                          FlightInfo::Make(*schema, descriptor, endpoints, -1, -1))
+
+    return std::unique_ptr<FlightInfo>(new FlightInfo(result));
+  }
+
+  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetForTestCase(
+      std::shared_ptr<Schema> schema) {
+    ARROW_ASSIGN_OR_RAISE(auto reader2, RecordBatchReader::Make({}, schema));
+    return std::unique_ptr<FlightDataStream>(new RecordBatchStream(reader2));
+  }
+};
+
+/// \brief Integration test scenario for validating Flight SQL specs across multiple
+/// implementations. This should ensure that RPC objects are being built and parsed
+/// correctly for multiple languages and that the Arrow schemas are returned as expected.
+class FlightSqlScenario : public Scenario {
+  Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+                    FlightServerOptions* options) override {
+    server->reset(new FlightSqlScenarioServer());
+    return Status::OK();
+  }
+
+  Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }
+
+  Status Validate(std::shared_ptr<Schema> expectedSchema,
+                  arrow::Result<std::unique_ptr<FlightInfo>> flightInfo,
+                  sql::FlightSqlClient* sql_client) {
+    FlightCallOptions call_options;
+
+    ARROW_ASSIGN_OR_RAISE(auto flight_info, flightInfo);
+    ARROW_ASSIGN_OR_RAISE(
+        auto reader, sql_client->DoGet(call_options, flight_info->endpoints()[0].ticket));
+
+    ARROW_ASSIGN_OR_RAISE(auto actual_schema, reader->GetSchema());
+
+    AssertSchemaEqual(expectedSchema, actual_schema);
+
+    return Status::OK();
+  }
+
+  Status RunClient(std::unique_ptr<FlightClient> client) override {
+    sql::FlightSqlClient sql_client(std::move(client));
+
+    ARROW_RETURN_NOT_OK(ValidateMetadataRetrieval(&sql_client));
+
+    ARROW_RETURN_NOT_OK(ValidateStatementExecution(&sql_client));
+
+    ARROW_RETURN_NOT_OK(ValidatePreparedStatementExecution(&sql_client));
+
+    return Status::OK();
+  }
+
+  Status ValidateMetadataRetrieval(sql::FlightSqlClient* sql_client) {
+    FlightCallOptions options;
+
+    std::string catalog = "catalog";
+    std::string db_schema_filter_pattern = "db_schema_filter_pattern";
+    std::string table_filter_pattern = "table_filter_pattern";
+    std::string table = "table";
+    std::string db_schema = "db_schema";
+    std::vector<std::string> table_types = {"table", "view"};
+
+    sql::TableRef table_ref = {catalog, db_schema, table};
+    sql::TableRef pk_table_ref = {"pk_catalog", "pk_db_schema", "pk_table"};
+    sql::TableRef fk_table_ref = {"fk_catalog", "fk_db_schema", "fk_table"};
+
+    ARROW_RETURN_NOT_OK(Validate(sql::SqlSchema::GetCatalogsSchema(),
+                                 sql_client->GetCatalogs(options), sql_client));
+    ARROW_RETURN_NOT_OK(
+        Validate(sql::SqlSchema::GetDbSchemasSchema(),
+                 sql_client->GetDbSchemas(options, &catalog, &db_schema_filter_pattern),
+                 sql_client));
+    ARROW_RETURN_NOT_OK(
+        Validate(sql::SqlSchema::GetTablesSchemaWithIncludedSchema(),
+                 sql_client->GetTables(options, &catalog, &db_schema_filter_pattern,
+                                       &table_filter_pattern, true, &table_types),
+                 sql_client));
+    ARROW_RETURN_NOT_OK(Validate(sql::SqlSchema::GetTableTypesSchema(),
+                                 sql_client->GetTableTypes(options), sql_client));
+    ARROW_RETURN_NOT_OK(Validate(sql::SqlSchema::GetPrimaryKeysSchema(),
+                                 sql_client->GetPrimaryKeys(options, table_ref),
+                                 sql_client));
+    ARROW_RETURN_NOT_OK(Validate(sql::SqlSchema::GetExportedKeysSchema(),
+                                 sql_client->GetExportedKeys(options, table_ref),
+                                 sql_client));
+    ARROW_RETURN_NOT_OK(Validate(sql::SqlSchema::GetImportedKeysSchema(),
+                                 sql_client->GetImportedKeys(options, table_ref),
+                                 sql_client));
+    ARROW_RETURN_NOT_OK(Validate(
+        sql::SqlSchema::GetCrossReferenceSchema(),
+        sql_client->GetCrossReference(options, pk_table_ref, fk_table_ref), sql_client));
+    ARROW_RETURN_NOT_OK(Validate(
+        sql::SqlSchema::GetSqlInfoSchema(),
+        sql_client->GetSqlInfo(
+            options, {sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_NAME,
+                      sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_READ_ONLY}),
+        sql_client));
+
+    return Status::OK();
+  }
+
+  Status ValidateStatementExecution(sql::FlightSqlClient* sql_client) {
+    FlightCallOptions options;
+
+    ARROW_RETURN_NOT_OK(Validate(
+            GetQuerySchema(), sql_client->Execute(options, "SELECT STATEMENT"), sql_client));
+    ARROW_ASSIGN_OR_RAISE(auto update_statement_result,
+                          sql_client->ExecuteUpdate(options, "UPDATE STATEMENT"));
+    if (update_statement_result != 10000L) {
+      return Status::Invalid("Expected 'UPDATE STATEMENT' return 10000, got ",
+                             update_statement_result);
+    }
+
+    return Status::OK();
+  }
+
+  Status ValidatePreparedStatementExecution(sql::FlightSqlClient* sql_client) {
+    FlightCallOptions options;
+
+    ARROW_ASSIGN_OR_RAISE(auto select_prepared_statement,
+                          sql_client->Prepare(options, "SELECT PREPARED STATEMENT"));
+    ARROW_RETURN_NOT_OK(
+            Validate(GetQuerySchema(), select_prepared_statement->Execute(), sql_client));
+
+    ARROW_ASSIGN_OR_RAISE(auto update_prepared_statement,
+                          sql_client->Prepare(options, "UPDATE PREPARED STATEMENT"));
+    ARROW_ASSIGN_OR_RAISE(auto update_prepared_statement_result,
+                          update_prepared_statement->ExecuteUpdate());
+    if (update_prepared_statement_result != 20000L) {
+      return Status::Invalid("Expected 'UPDATE STATEMENT' return 20000, got ",
+                             update_prepared_statement_result);
+    }
+
+    return Status::OK();

Review comment:
       Can we explicitly close the prepared statement here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org