You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2020/08/15 04:23:05 UTC
[arrow] branch master updated: ARROW-9358: [Integration] remove
generated_large_batch.json
This is an automated email from the ASF dual-hosted git repository.
emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 222859d ARROW-9358: [Integration] remove generated_large_batch.json
222859d is described below
commit 222859de38fb0b286b1c44fbd873ca2eeb335858
Author: David Li <li...@gmail.com>
AuthorDate: Fri Aug 14 21:22:40 2020 -0700
ARROW-9358: [Integration] remove generated_large_batch.json
This should speed up integration tests by moving the expensive large batch test to the individual Flight implementations.
Closes #7908 from lidavidm/arrow-9358
Authored-by: David Li <li...@gmail.com>
Signed-off-by: Micah Kornfield <em...@gmail.com>
---
cpp/src/arrow/flight/flight_test.cc | 28 +++++++-
cpp/src/arrow/flight/test_util.cc | 29 +++++++++
cpp/src/arrow/flight/test_util.h | 6 ++
dev/archery/archery/integration/datagen.py | 7 +-
dev/archery/archery/integration/runner.py | 5 +-
.../apache/arrow/flight/TestBasicOperation.java | 74 +++++++++++++++++++++-
6 files changed, 135 insertions(+), 14 deletions(-)
diff --git a/cpp/src/arrow/flight/flight_test.cc b/cpp/src/arrow/flight/flight_test.cc
index 3808699..cb88d85 100644
--- a/cpp/src/arrow/flight/flight_test.cc
+++ b/cpp/src/arrow/flight/flight_test.cc
@@ -354,8 +354,6 @@ class TestFlightClient : public ::testing::Test {
template <typename EndpointCheckFunc>
void CheckDoGet(const FlightDescriptor& descr, const BatchVector& expected_batches,
EndpointCheckFunc&& check_endpoints) {
- auto num_batches = static_cast<int>(expected_batches.size());
- ASSERT_GE(num_batches, 2);
auto expected_schema = expected_batches[0]->schema();
std::unique_ptr<FlightInfo> info;
@@ -369,6 +367,13 @@ class TestFlightClient : public ::testing::Test {
// By convention, fetch the first endpoint
Ticket ticket = info->endpoints()[0].ticket;
+ CheckDoGet(ticket, expected_batches);
+ }
+
+ void CheckDoGet(const Ticket& ticket, const BatchVector& expected_batches) {
+ auto num_batches = static_cast<int>(expected_batches.size());
+ ASSERT_GE(num_batches, 2);
+
std::unique_ptr<FlightStreamReader> stream;
ASSERT_OK(client_->DoGet(ticket, &stream));
@@ -1105,6 +1110,15 @@ TEST_F(TestFlightClient, DoGetDicts) {
CheckDoGet(descr, expected_batches, check_endpoints);
}
+// Ensure the gRPC client is configured to allow large messages
+// Tests a 32 MiB batch
+TEST_F(TestFlightClient, DoGetLargeBatch) {
+ BatchVector expected_batches;
+ ASSERT_OK(ExampleLargeBatches(&expected_batches));
+ Ticket ticket{"ticket-large-batch-1"};
+ CheckDoGet(ticket, expected_batches);
+}
+
TEST_F(TestFlightClient, DoExchange) {
auto descr = FlightDescriptor::Command("counter");
BatchVector batches;
@@ -1515,6 +1529,16 @@ TEST_F(TestDoPut, DoPutDicts) {
CheckDoPut(descr, schema, batches);
}
+// Ensure the gRPC server is configured to allow large messages
+// Tests a 32 MiB batch
+TEST_F(TestDoPut, DoPutLargeBatch) {
+ auto descr = FlightDescriptor::Path({"large-batches"});
+ auto schema = ExampleLargeSchema();
+ BatchVector batches;
+ ASSERT_OK(ExampleLargeBatches(&batches));
+ CheckDoPut(descr, schema, batches);
+}
+
TEST_F(TestDoPut, DoPutSizeLimit) {
const int64_t size_limit = 4096;
Location location;
diff --git a/cpp/src/arrow/flight/test_util.cc b/cpp/src/arrow/flight/test_util.cc
index 302fda1..f398a1d 100644
--- a/cpp/src/arrow/flight/test_util.cc
+++ b/cpp/src/arrow/flight/test_util.cc
@@ -38,6 +38,7 @@
#include <gtest/gtest.h>
#include "arrow/ipc/test_common.h"
+#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"
#include "arrow/util/logging.h"
@@ -156,6 +157,11 @@ Status GetBatchForFlight(const Ticket& ticket, std::shared_ptr<RecordBatchReader
RETURN_NOT_OK(ExampleDictBatches(&batches));
*out = std::make_shared<BatchIterator>(batches[0]->schema(), batches);
return Status::OK();
+ } else if (ticket.ticket == "ticket-large-batch-1") {
+ BatchVector batches;
+ RETURN_NOT_OK(ExampleLargeBatches(&batches));
+ *out = std::make_shared<BatchIterator>(batches[0]->schema(), batches);
+ return Status::OK();
} else {
return Status::NotImplemented("no stream implemented for ticket: " + ticket.ticket);
}
@@ -504,6 +510,15 @@ std::shared_ptr<Schema> ExampleDictSchema() {
return batch->schema();
}
+std::shared_ptr<Schema> ExampleLargeSchema() {
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ for (int i = 0; i < 128; i++) {
+ const auto field_name = "f" + std::to_string(i);
+ fields.push_back(arrow::field(field_name, arrow::float64()));
+ }
+ return arrow::schema(fields);
+}
+
std::vector<FlightInfo> ExampleFlightInfo() {
Location location1;
Location location2;
@@ -582,6 +597,20 @@ Status ExampleNestedBatches(BatchVector* out) {
return Status::OK();
}
+Status ExampleLargeBatches(BatchVector* out) {
+ const auto array_length = 32768;
+ std::shared_ptr<RecordBatch> batch;
+ std::vector<std::shared_ptr<arrow::Array>> arrays;
+ const auto arr = arrow::ConstantArrayGenerator::Float64(array_length, 1.0);
+ for (int i = 0; i < 128; i++) {
+ arrays.push_back(arr);
+ }
+ auto schema = ExampleLargeSchema();
+ out->push_back(RecordBatch::Make(schema, array_length, arrays));
+ out->push_back(RecordBatch::Make(schema, array_length, arrays));
+ return Status::OK();
+}
+
std::vector<ActionType> ExampleActionTypes() {
return {{"drop", "drop a dataset"}, {"cache", "cache a dataset"}};
}
diff --git a/cpp/src/arrow/flight/test_util.h b/cpp/src/arrow/flight/test_util.h
index 5cb5267..38086f6 100644
--- a/cpp/src/arrow/flight/test_util.h
+++ b/cpp/src/arrow/flight/test_util.h
@@ -141,6 +141,9 @@ ARROW_FLIGHT_EXPORT
std::shared_ptr<Schema> ExampleDictSchema();
ARROW_FLIGHT_EXPORT
+std::shared_ptr<Schema> ExampleLargeSchema();
+
+ARROW_FLIGHT_EXPORT
Status ExampleIntBatches(BatchVector* out);
ARROW_FLIGHT_EXPORT
@@ -153,6 +156,9 @@ ARROW_FLIGHT_EXPORT
Status ExampleNestedBatches(BatchVector* out);
ARROW_FLIGHT_EXPORT
+Status ExampleLargeBatches(BatchVector* out);
+
+ARROW_FLIGHT_EXPORT
std::vector<FlightInfo> ExampleFlightInfo();
ARROW_FLIGHT_EXPORT
diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py
index 39c4b43..69f463b 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -1485,7 +1485,7 @@ def generate_extension_case():
dictionaries=[dict0])
-def get_generated_json_files(tempdir=None, flight=False):
+def get_generated_json_files(tempdir=None):
tempdir = tempdir or tempfile.mkdtemp(prefix='arrow-integration-')
def _temp_path():
@@ -1583,11 +1583,6 @@ def get_generated_json_files(tempdir=None, flight=False):
.skip_category('Rust'),
]
- if flight:
- file_objs.append(generate_primitive_case([24 * 1024],
- name='large_batch')
- .skip_category('Rust'))
-
generated_paths = []
for file_obj in file_objs:
out_path = os.path.join(tempdir, 'generated_' +
diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py
index 3deb8d3..c1d7a69 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -337,10 +337,7 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
testers.append(RustTester(**kwargs))
static_json_files = get_static_json_files()
- generated_json_files = datagen.get_generated_json_files(
- tempdir=tempdir,
- flight=run_flight
- )
+ generated_json_files = datagen.get_generated_json_files(tempdir=tempdir)
json_files = static_json_files + generated_json_files
# Additional integration test cases for Arrow Flight.
diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
index 8242bc0..bae6582 100644
--- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
+++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
@@ -20,10 +20,12 @@ package org.apache.arrow.flight;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -33,6 +35,8 @@ import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.Flight.FlightDescriptor.DescriptorType;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
@@ -244,6 +248,50 @@ public class TestBasicOperation {
});
}
+ /** Ensure the client is configured to accept large messages. */
+ @Test
+ public void getStreamLargeBatch() throws Exception {
+ test(c -> {
+ try (final FlightStream stream = c.getStream(new Ticket(Producer.TICKET_LARGE_BATCH))) {
+ Assert.assertEquals(128, stream.getRoot().getFieldVectors().size());
+ Assert.assertTrue(stream.next());
+ Assert.assertEquals(65536, stream.getRoot().getRowCount());
+ Assert.assertTrue(stream.next());
+ Assert.assertEquals(65536, stream.getRoot().getRowCount());
+ Assert.assertFalse(stream.next());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ /** Ensure the server is configured to accept large messages. */
+ @Test
+ public void startPutLargeBatch() throws Exception {
+ try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE)) {
+ final List<FieldVector> vectors = new ArrayList<>();
+ for (int col = 0; col < 128; col++) {
+ final BigIntVector vector = new BigIntVector("f" + col, allocator);
+ for (int row = 0; row < 65536; row++) {
+ vector.setSafe(row, row);
+ }
+ vectors.add(vector);
+ }
+ test(c -> {
+ try (final VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
+ root.setRowCount(65536);
+ final ClientStreamListener stream = c.startPut(FlightDescriptor.path(""), root, new SyncPutListener());
+ stream.putNext();
+ stream.putNext();
+ stream.completed();
+ stream.getResult();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
private void test(Consumer<FlightClient> consumer) throws Exception {
test((c, a) -> {
consumer.accept(c);
@@ -273,6 +321,7 @@ public class TestBasicOperation {
* An example FlightProducer for test purposes.
*/
public static class Producer implements FlightProducer, AutoCloseable {
+ static final byte[] TICKET_LARGE_BATCH = "large-batch".getBytes(StandardCharsets.UTF_8);
private final BufferAllocator allocator;
@@ -313,8 +362,11 @@ public class TestBasicOperation {
}
@Override
- public void getStream(CallContext context, Ticket ticket,
- ServerStreamListener listener) {
+ public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
+ if (Arrays.equals(TICKET_LARGE_BATCH, ticket.getBytes())) {
+ getLargeBatch(listener);
+ return;
+ }
final int size = 10;
IntVector iv = new IntVector("c1", allocator);
@@ -343,6 +395,24 @@ public class TestBasicOperation {
listener.completed();
}
+ private void getLargeBatch(ServerStreamListener listener) {
+ final List<FieldVector> vectors = new ArrayList<>();
+ for (int col = 0; col < 128; col++) {
+ final BigIntVector vector = new BigIntVector("f" + col, allocator);
+ for (int row = 0; row < 65536; row++) {
+ vector.setSafe(row, row);
+ }
+ vectors.add(vector);
+ }
+ try (final VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
+ root.setRowCount(65536);
+ listener.start(root);
+ listener.putNext();
+ listener.putNext();
+ listener.completed();
+ }
+ }
+
@Override
public void close() throws Exception {
allocator.close();