You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/06/02 17:31:47 UTC
[arrow-cookbook] branch main updated: [Java] Fix memory management issues in Flight example (#219)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-cookbook.git
The following commit(s) were added to refs/heads/main by this push:
new b15ff49 [Java] Fix memory management issues in Flight example (#219)
b15ff49 is described below
commit b15ff4956728e0a1ccb3e4bd9fbe890951ed09a8
Author: David Li <li...@gmail.com>
AuthorDate: Thu Jun 2 13:31:43 2022 -0400
[Java] Fix memory management issues in Flight example (#219)
Fixes #218.
---
java/source/flight.rst | 57 +++++++++++++++++++++++++++++++++-----------------
1 file changed, 38 insertions(+), 19 deletions(-)
diff --git a/java/source/flight.rst b/java/source/flight.rst
index bdaf81b..8bc3df5 100644
--- a/java/source/flight.rst
+++ b/java/source/flight.rst
@@ -52,6 +52,7 @@ Flight Client and Server
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
@@ -71,7 +72,7 @@ Flight Client and Server
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
- class Dataset {
+ class Dataset implements AutoCloseable {
private final List<ArrowRecordBatch> batches;
private final Schema schema;
private final long rows;
@@ -89,11 +90,15 @@ Flight Client and Server
public long getRows() {
return rows;
}
+ @Override
+ public void close() throws Exception {
+ AutoCloseables.close(batches);
+ }
}
- class CookbookProducer extends NoOpFlightProducer {
+ class CookbookProducer extends NoOpFlightProducer implements AutoCloseable {
private final BufferAllocator allocator;
private final Location location;
- private final ConcurrentHashMap<FlightDescriptor, Dataset> datasets;
+ private final ConcurrentMap<FlightDescriptor, Dataset> datasets;
public CookbookProducer(BufferAllocator allocator, Location location) {
this.allocator = allocator;
this.location = location;
@@ -107,10 +112,9 @@ Flight Client and Server
VectorUnloader unloader;
while (flightStream.next()) {
unloader = new VectorUnloader(flightStream.getRoot());
- try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
- batches.add(arb);
- rows += flightStream.getRoot().getRowCount();
- }
+ final ArrowRecordBatch arb = unloader.getRecordBatch();
+ batches.add(arb);
+ rows += flightStream.getRoot().getRowCount();
}
Dataset dataset = new Dataset(batches, flightStream.getSchema(), rows);
datasets.put(flightStream.getDescriptor(), dataset);
@@ -125,13 +129,13 @@ Flight Client and Server
Dataset dataset = this.datasets.get(flightDescriptor);
if (dataset == null) {
throw CallStatus.NOT_FOUND.withDescription("Unknown descriptor").toRuntimeException();
- } else {
- VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(
- this.datasets.get(flightDescriptor).getSchema(), allocator);
- listener.start(vectorSchemaRoot);
+ }
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(
+ this.datasets.get(flightDescriptor).getSchema(), allocator)) {
+ VectorLoader loader = new VectorLoader(root);
+ listener.start(root);
for (ArrowRecordBatch arrowRecordBatch : this.datasets.get(flightDescriptor).getBatches()) {
- VectorLoader loader = new VectorLoader(vectorSchemaRoot);
- loader.load(arrowRecordBatch.cloneWithTransfer(allocator));
+ loader.load(arrowRecordBatch);
listener.putNext();
}
listener.completed();
@@ -143,8 +147,17 @@ Flight Client and Server
FlightDescriptor flightDescriptor = FlightDescriptor.path(
new String(action.getBody(), StandardCharsets.UTF_8));
switch (action.getType()) {
- case "DELETE":
- if (datasets.remove(flightDescriptor) != null) {
+ case "DELETE": {
+ Dataset removed = datasets.remove(flightDescriptor);
+ if (removed != null) {
+ try {
+ removed.close();
+ } catch (Exception e) {
+ listener.onError(CallStatus.INTERNAL
+ .withDescription(e.toString())
+ .toRuntimeException());
+ return;
+ }
Result result = new Result("Delete completed".getBytes(StandardCharsets.UTF_8));
listener.onNext(result);
} else {
@@ -153,6 +166,7 @@ Flight Client and Server
listener.onNext(result);
}
listener.onCompleted();
+ }
}
}
@@ -174,17 +188,22 @@ Flight Client and Server
datasets.forEach((k, v) -> { listener.onNext(getFlightInfo(null, k)); });
listener.onCompleted();
}
+
+ @Override
+ public void close() throws Exception {
+ AutoCloseables.close(datasets.values());
+ }
}
Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
try (BufferAllocator allocator = new RootAllocator()){
// Server
- try(FlightServer flightServer = FlightServer.builder(allocator, location,
- new CookbookProducer(allocator, location)).build()) {
+ try(final CookbookProducer producer = new CookbookProducer(allocator, location);
+ final FlightServer flightServer = FlightServer.builder(allocator, location, producer).build()) {
try {
flightServer.start();
System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort());
} catch (IOException e) {
- System.exit(1);
+ throw new RuntimeException(e);
}
// Client
@@ -534,4 +553,4 @@ Stop Flight Server
C8: Server shut down successfully
-_`Arrow Flight RPC`: https://arrow.apache.org/docs/format/Flight.html
\ No newline at end of file
+_`Arrow Flight RPC`: https://arrow.apache.org/docs/format/Flight.html