You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2020/08/15 04:23:05 UTC

[arrow] branch master updated: ARROW-9358: [Integration] remove generated_large_batch.json

This is an automated email from the ASF dual-hosted git repository.

emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 222859d  ARROW-9358: [Integration] remove generated_large_batch.json
222859d is described below

commit 222859de38fb0b286b1c44fbd873ca2eeb335858
Author: David Li <li...@gmail.com>
AuthorDate: Fri Aug 14 21:22:40 2020 -0700

    ARROW-9358: [Integration] remove generated_large_batch.json
    
    This should speed up integration tests by moving the expensive large batch test to the individual Flight implementations.
    
    Closes #7908 from lidavidm/arrow-9358
    
    Authored-by: David Li <li...@gmail.com>
    Signed-off-by: Micah Kornfield <em...@gmail.com>
---
 cpp/src/arrow/flight/flight_test.cc                | 28 +++++++-
 cpp/src/arrow/flight/test_util.cc                  | 29 +++++++++
 cpp/src/arrow/flight/test_util.h                   |  6 ++
 dev/archery/archery/integration/datagen.py         |  7 +-
 dev/archery/archery/integration/runner.py          |  5 +-
 .../apache/arrow/flight/TestBasicOperation.java    | 74 +++++++++++++++++++++-
 6 files changed, 135 insertions(+), 14 deletions(-)

diff --git a/cpp/src/arrow/flight/flight_test.cc b/cpp/src/arrow/flight/flight_test.cc
index 3808699..cb88d85 100644
--- a/cpp/src/arrow/flight/flight_test.cc
+++ b/cpp/src/arrow/flight/flight_test.cc
@@ -354,8 +354,6 @@ class TestFlightClient : public ::testing::Test {
   template <typename EndpointCheckFunc>
   void CheckDoGet(const FlightDescriptor& descr, const BatchVector& expected_batches,
                   EndpointCheckFunc&& check_endpoints) {
-    auto num_batches = static_cast<int>(expected_batches.size());
-    ASSERT_GE(num_batches, 2);
     auto expected_schema = expected_batches[0]->schema();
 
     std::unique_ptr<FlightInfo> info;
@@ -369,6 +367,13 @@ class TestFlightClient : public ::testing::Test {
 
     // By convention, fetch the first endpoint
     Ticket ticket = info->endpoints()[0].ticket;
+    CheckDoGet(ticket, expected_batches);
+  }
+
+  void CheckDoGet(const Ticket& ticket, const BatchVector& expected_batches) {
+    auto num_batches = static_cast<int>(expected_batches.size());
+    ASSERT_GE(num_batches, 2);
+
     std::unique_ptr<FlightStreamReader> stream;
     ASSERT_OK(client_->DoGet(ticket, &stream));
 
@@ -1105,6 +1110,15 @@ TEST_F(TestFlightClient, DoGetDicts) {
   CheckDoGet(descr, expected_batches, check_endpoints);
 }
 
+// Ensure the gRPC client is configured to allow large messages
+// Tests a 32 MiB batch
+TEST_F(TestFlightClient, DoGetLargeBatch) {
+  BatchVector expected_batches;
+  ASSERT_OK(ExampleLargeBatches(&expected_batches));
+  Ticket ticket{"ticket-large-batch-1"};
+  CheckDoGet(ticket, expected_batches);
+}
+
 TEST_F(TestFlightClient, DoExchange) {
   auto descr = FlightDescriptor::Command("counter");
   BatchVector batches;
@@ -1515,6 +1529,16 @@ TEST_F(TestDoPut, DoPutDicts) {
   CheckDoPut(descr, schema, batches);
 }
 
+// Ensure the gRPC server is configured to allow large messages
+// Tests a 32 MiB batch
+TEST_F(TestDoPut, DoPutLargeBatch) {
+  auto descr = FlightDescriptor::Path({"large-batches"});
+  auto schema = ExampleLargeSchema();
+  BatchVector batches;
+  ASSERT_OK(ExampleLargeBatches(&batches));
+  CheckDoPut(descr, schema, batches);
+}
+
 TEST_F(TestDoPut, DoPutSizeLimit) {
   const int64_t size_limit = 4096;
   Location location;
diff --git a/cpp/src/arrow/flight/test_util.cc b/cpp/src/arrow/flight/test_util.cc
index 302fda1..f398a1d 100644
--- a/cpp/src/arrow/flight/test_util.cc
+++ b/cpp/src/arrow/flight/test_util.cc
@@ -38,6 +38,7 @@
 #include <gtest/gtest.h>
 
 #include "arrow/ipc/test_common.h"
+#include "arrow/testing/generator.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/util.h"
 #include "arrow/util/logging.h"
@@ -156,6 +157,11 @@ Status GetBatchForFlight(const Ticket& ticket, std::shared_ptr<RecordBatchReader
     RETURN_NOT_OK(ExampleDictBatches(&batches));
     *out = std::make_shared<BatchIterator>(batches[0]->schema(), batches);
     return Status::OK();
+  } else if (ticket.ticket == "ticket-large-batch-1") {
+    BatchVector batches;
+    RETURN_NOT_OK(ExampleLargeBatches(&batches));
+    *out = std::make_shared<BatchIterator>(batches[0]->schema(), batches);
+    return Status::OK();
   } else {
     return Status::NotImplemented("no stream implemented for ticket: " + ticket.ticket);
   }
@@ -504,6 +510,15 @@ std::shared_ptr<Schema> ExampleDictSchema() {
   return batch->schema();
 }
 
+std::shared_ptr<Schema> ExampleLargeSchema() {
+  std::vector<std::shared_ptr<arrow::Field>> fields;
+  for (int i = 0; i < 128; i++) {
+    const auto field_name = "f" + std::to_string(i);
+    fields.push_back(arrow::field(field_name, arrow::float64()));
+  }
+  return arrow::schema(fields);
+}
+
 std::vector<FlightInfo> ExampleFlightInfo() {
   Location location1;
   Location location2;
@@ -582,6 +597,20 @@ Status ExampleNestedBatches(BatchVector* out) {
   return Status::OK();
 }
 
+Status ExampleLargeBatches(BatchVector* out) {
+  const auto array_length = 32768;
+  std::shared_ptr<RecordBatch> batch;
+  std::vector<std::shared_ptr<arrow::Array>> arrays;
+  const auto arr = arrow::ConstantArrayGenerator::Float64(array_length, 1.0);
+  for (int i = 0; i < 128; i++) {
+    arrays.push_back(arr);
+  }
+  auto schema = ExampleLargeSchema();
+  out->push_back(RecordBatch::Make(schema, array_length, arrays));
+  out->push_back(RecordBatch::Make(schema, array_length, arrays));
+  return Status::OK();
+}
+
 std::vector<ActionType> ExampleActionTypes() {
   return {{"drop", "drop a dataset"}, {"cache", "cache a dataset"}};
 }
diff --git a/cpp/src/arrow/flight/test_util.h b/cpp/src/arrow/flight/test_util.h
index 5cb5267..38086f6 100644
--- a/cpp/src/arrow/flight/test_util.h
+++ b/cpp/src/arrow/flight/test_util.h
@@ -141,6 +141,9 @@ ARROW_FLIGHT_EXPORT
 std::shared_ptr<Schema> ExampleDictSchema();
 
 ARROW_FLIGHT_EXPORT
+std::shared_ptr<Schema> ExampleLargeSchema();
+
+ARROW_FLIGHT_EXPORT
 Status ExampleIntBatches(BatchVector* out);
 
 ARROW_FLIGHT_EXPORT
@@ -153,6 +156,9 @@ ARROW_FLIGHT_EXPORT
 Status ExampleNestedBatches(BatchVector* out);
 
 ARROW_FLIGHT_EXPORT
+Status ExampleLargeBatches(BatchVector* out);
+
+ARROW_FLIGHT_EXPORT
 std::vector<FlightInfo> ExampleFlightInfo();
 
 ARROW_FLIGHT_EXPORT
diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py
index 39c4b43..69f463b 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -1485,7 +1485,7 @@ def generate_extension_case():
                           dictionaries=[dict0])
 
 
-def get_generated_json_files(tempdir=None, flight=False):
+def get_generated_json_files(tempdir=None):
     tempdir = tempdir or tempfile.mkdtemp(prefix='arrow-integration-')
 
     def _temp_path():
@@ -1583,11 +1583,6 @@ def get_generated_json_files(tempdir=None, flight=False):
         .skip_category('Rust'),
     ]
 
-    if flight:
-        file_objs.append(generate_primitive_case([24 * 1024],
-                                                 name='large_batch')
-                         .skip_category('Rust'))
-
     generated_paths = []
     for file_obj in file_objs:
         out_path = os.path.join(tempdir, 'generated_' +
diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py
index 3deb8d3..c1d7a69 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -337,10 +337,7 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
         testers.append(RustTester(**kwargs))
 
     static_json_files = get_static_json_files()
-    generated_json_files = datagen.get_generated_json_files(
-        tempdir=tempdir,
-        flight=run_flight
-    )
+    generated_json_files = datagen.get_generated_json_files(tempdir=tempdir)
     json_files = static_json_files + generated_json_files
 
     # Additional integration test cases for Arrow Flight.
diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
index 8242bc0..bae6582 100644
--- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
+++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
@@ -20,10 +20,12 @@ package org.apache.arrow.flight;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -33,6 +35,8 @@ import org.apache.arrow.flight.impl.Flight;
 import org.apache.arrow.flight.impl.Flight.FlightDescriptor.DescriptorType;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.types.pojo.ArrowType;
@@ -244,6 +248,50 @@ public class TestBasicOperation {
     });
   }
 
+  /** Ensure the client is configured to accept large messages. */
+  @Test
+  public void getStreamLargeBatch() throws Exception {
+    test(c -> {
+      try (final FlightStream stream = c.getStream(new Ticket(Producer.TICKET_LARGE_BATCH))) {
+        Assert.assertEquals(128, stream.getRoot().getFieldVectors().size());
+        Assert.assertTrue(stream.next());
+        Assert.assertEquals(65536, stream.getRoot().getRowCount());
+        Assert.assertTrue(stream.next());
+        Assert.assertEquals(65536, stream.getRoot().getRowCount());
+        Assert.assertFalse(stream.next());
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  /** Ensure the server is configured to accept large messages. */
+  @Test
+  public void startPutLargeBatch() throws Exception {
+    try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE)) {
+      final List<FieldVector> vectors = new ArrayList<>();
+      for (int col = 0; col < 128; col++) {
+        final BigIntVector vector = new BigIntVector("f" + col, allocator);
+        for (int row = 0; row < 65536; row++) {
+          vector.setSafe(row, row);
+        }
+        vectors.add(vector);
+      }
+      test(c -> {
+        try (final VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
+          root.setRowCount(65536);
+          final ClientStreamListener stream = c.startPut(FlightDescriptor.path(""), root, new SyncPutListener());
+          stream.putNext();
+          stream.putNext();
+          stream.completed();
+          stream.getResult();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      });
+    }
+  }
+
   private void test(Consumer<FlightClient> consumer) throws Exception {
     test((c, a) -> {
       consumer.accept(c);
@@ -273,6 +321,7 @@ public class TestBasicOperation {
    * An example FlightProducer for test purposes.
    */
   public static class Producer implements FlightProducer, AutoCloseable {
+    static final byte[] TICKET_LARGE_BATCH = "large-batch".getBytes(StandardCharsets.UTF_8);
 
     private final BufferAllocator allocator;
 
@@ -313,8 +362,11 @@ public class TestBasicOperation {
     }
 
     @Override
-    public void getStream(CallContext context, Ticket ticket,
-        ServerStreamListener listener) {
+    public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
+      if (Arrays.equals(TICKET_LARGE_BATCH, ticket.getBytes())) {
+        getLargeBatch(listener);
+        return;
+      }
       final int size = 10;
 
       IntVector iv = new IntVector("c1", allocator);
@@ -343,6 +395,24 @@ public class TestBasicOperation {
       listener.completed();
     }
 
+    private void getLargeBatch(ServerStreamListener listener) {
+      final List<FieldVector> vectors = new ArrayList<>();
+      for (int col = 0; col < 128; col++) {
+        final BigIntVector vector = new BigIntVector("f" + col, allocator);
+        for (int row = 0; row < 65536; row++) {
+          vector.setSafe(row, row);
+        }
+        vectors.add(vector);
+      }
+      try (final VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
+        root.setRowCount(65536);
+        listener.start(root);
+        listener.putNext();
+        listener.putNext();
+        listener.completed();
+      }
+    }
+
     @Override
     public void close() throws Exception {
       allocator.close();