You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/06/02 17:31:47 UTC

[arrow-cookbook] branch main updated: [Java] Fix memory management issues in Flight example (#219)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-cookbook.git


The following commit(s) were added to refs/heads/main by this push:
     new b15ff49  [Java] Fix memory management issues in Flight example (#219)
b15ff49 is described below

commit b15ff4956728e0a1ccb3e4bd9fbe890951ed09a8
Author: David Li <li...@gmail.com>
AuthorDate: Thu Jun 2 13:31:43 2022 -0400

    [Java] Fix memory management issues in Flight example (#219)
    
    Fixes #218.
---
 java/source/flight.rst | 57 +++++++++++++++++++++++++++++++++-----------------
 1 file changed, 38 insertions(+), 19 deletions(-)

diff --git a/java/source/flight.rst b/java/source/flight.rst
index bdaf81b..8bc3df5 100644
--- a/java/source/flight.rst
+++ b/java/source/flight.rst
@@ -52,6 +52,7 @@ Flight Client and Server
     import org.apache.arrow.flight.Ticket;
     import org.apache.arrow.memory.BufferAllocator;
     import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.util.AutoCloseables;
     import org.apache.arrow.vector.VarCharVector;
     import org.apache.arrow.vector.VectorLoader;
     import org.apache.arrow.vector.VectorSchemaRoot;
@@ -71,7 +72,7 @@ Flight Client and Server
     import java.util.List;
     import java.util.concurrent.ConcurrentHashMap;
 
-    class Dataset {
+    class Dataset implements AutoCloseable {
         private final List<ArrowRecordBatch> batches;
         private final Schema schema;
         private final long rows;
@@ -89,11 +90,15 @@ Flight Client and Server
         public long getRows() {
             return rows;
         }
+        @Override
+        public void close() throws Exception {
+            AutoCloseables.close(batches);
+        }
     }
-    class CookbookProducer extends NoOpFlightProducer {
+    class CookbookProducer extends NoOpFlightProducer implements AutoCloseable {
         private final BufferAllocator allocator;
         private final Location location;
-        private final ConcurrentHashMap<FlightDescriptor, Dataset> datasets;
+        private final ConcurrentMap<FlightDescriptor, Dataset> datasets;
         public CookbookProducer(BufferAllocator allocator, Location location) {
             this.allocator = allocator;
             this.location = location;
@@ -107,10 +112,9 @@ Flight Client and Server
                 VectorUnloader unloader;
                 while (flightStream.next()) {
                     unloader = new VectorUnloader(flightStream.getRoot());
-                    try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
-                        batches.add(arb);
-                        rows += flightStream.getRoot().getRowCount();
-                    }
+                    final ArrowRecordBatch arb = unloader.getRecordBatch();
+                    batches.add(arb);
+                    rows += flightStream.getRoot().getRowCount();
                 }
                 Dataset dataset = new Dataset(batches, flightStream.getSchema(), rows);
                 datasets.put(flightStream.getDescriptor(), dataset);
@@ -125,13 +129,13 @@ Flight Client and Server
             Dataset dataset = this.datasets.get(flightDescriptor);
             if (dataset == null) {
                 throw CallStatus.NOT_FOUND.withDescription("Unknown descriptor").toRuntimeException();
-            } else {
-                VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(
-                        this.datasets.get(flightDescriptor).getSchema(), allocator);
-                listener.start(vectorSchemaRoot);
+            }
+            try (VectorSchemaRoot root = VectorSchemaRoot.create(
+                     this.datasets.get(flightDescriptor).getSchema(), allocator)) {
+                VectorLoader loader = new VectorLoader(root);
+                listener.start(root);
                 for (ArrowRecordBatch arrowRecordBatch : this.datasets.get(flightDescriptor).getBatches()) {
-                    VectorLoader loader = new VectorLoader(vectorSchemaRoot);
-                    loader.load(arrowRecordBatch.cloneWithTransfer(allocator));
+                    loader.load(arrowRecordBatch);
                     listener.putNext();
                 }
                 listener.completed();
@@ -143,8 +147,17 @@ Flight Client and Server
             FlightDescriptor flightDescriptor = FlightDescriptor.path(
                     new String(action.getBody(), StandardCharsets.UTF_8));
             switch (action.getType()) {
-                case "DELETE":
-                    if (datasets.remove(flightDescriptor) != null) {
+                case "DELETE": {
+                    Dataset removed = datasets.remove(flightDescriptor);
+                    if (removed != null) {
+                        try {
+                            removed.close();
+                        } catch (Exception e) {
+                            listener.onError(CallStatus.INTERNAL
+                                .withDescription(e.toString())
+                                .toRuntimeException());
+                            return;
+                        }
                         Result result = new Result("Delete completed".getBytes(StandardCharsets.UTF_8));
                         listener.onNext(result);
                     } else {
@@ -153,6 +166,7 @@ Flight Client and Server
                         listener.onNext(result);
                     }
                     listener.onCompleted();
+                }
             }
         }
 
@@ -174,17 +188,22 @@ Flight Client and Server
             datasets.forEach((k, v) -> { listener.onNext(getFlightInfo(null, k)); });
             listener.onCompleted();
         }
+
+        @Override
+        public void close() throws Exception {
+            AutoCloseables.close(datasets.values());
+        }
     }
     Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
     try (BufferAllocator allocator = new RootAllocator()){
         // Server
-        try(FlightServer flightServer = FlightServer.builder(allocator, location,
-                new CookbookProducer(allocator, location)).build()) {
+        try(final CookbookProducer producer = new CookbookProducer(allocator, location);
+            final FlightServer flightServer = FlightServer.builder(allocator, location, producer).build()) {
             try {
                 flightServer.start();
                 System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort());
             } catch (IOException e) {
-                System.exit(1);
+                throw new RuntimeException(e);
             }
 
             // Client
@@ -534,4 +553,4 @@ Stop Flight Server
 
     C8: Server shut down successfully
 
-_`Arrow Flight RPC`: https://arrow.apache.org/docs/format/Flight.html
\ No newline at end of file
+_`Arrow Flight RPC`: https://arrow.apache.org/docs/format/Flight.html