You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/26 12:39:22 UTC

[GitHub] [arrow-cookbook] iamsmkr opened a new issue, #218: Cookbook Java example does not work as expected with multiple batches

iamsmkr opened a new issue, #218:
URL: https://github.com/apache/arrow-cookbook/issues/218

   I am trying out the cookbook java example [here](https://arrow.apache.org/cookbook/java/flight.html). The only change is that I am trying to write multiple batches. See "batch" comment in the code.
   
   Upon running this example I am seeing unexpected **overlapping results**!! This thing gets wierder with multi-threading.
   Please suggest what is the correct way of sending multiple batches! 
   
   ```
   S1: Server (Location): Listening on port 33333
   C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333
   WARNING: An illegal reflective access operation has occurred
   WARNING: Illegal reflective access by org.apache.arrow.memory.util.MemoryUtil (file:/Users/rentsher/.m2/repository/org/apache/arrow/arrow-memory-core/8.0.0/arrow-memory-core-8.0.0.jar) to field java.nio.Buffer.address
   WARNING: Please consider reporting this to the maintainers of org.apache.arrow.memory.util.MemoryUtil
   WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
   WARNING: All illegal access operations will be denied in a future release
   C2: Client (Populate Data): Wrote 2 batches with 3 rows each
   C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Int(64, true) not null>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=60}
   C4: Client (Get Stream):
   Client Received batch apache/arrow#1, Data:
   vector size: 10
   30
   31
   32
   33
   34
   35
   36
   37
   38
   39
   Client Received batch apache/arrow#2, Data:
   vector size: 10
   40
   41
   42
   43
   44
   45
   46
   47
   48
   49
   Client Received batch apache/arrow#3, Data:
   vector size: 10
   50
   51
   52
   53
   54
   55
   56
   57
   58
   59
   Client Received batch apache/arrow#4, Data:
   vector size: 10
   30
   31
   32
   33
   34
   35
   36
   37
   38
   39
   Client Received batch apache/arrow#5, Data:
   vector size: 10
   40
   41
   42
   43
   44
   45
   46
   47
   48
   49
   Client Received batch apache/arrow#6, Data:
   vector size: 10
   50
   51
   52
   53
   54
   55
   56
   57
   58
   59
   C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Int(64, true) not null>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=60}
   C6: Client (Do Delete Action): Delete completed
   C7: Client (List Flights Info): After delete - No records
   C8: Server shut down successfully
   
   Process finished with exit code 0
   ```
   
   ```java
   package com.iamsmkr.arrowflight;
   
   import org.apache.arrow.flight.Action;
   import org.apache.arrow.flight.AsyncPutListener;
   import org.apache.arrow.flight.Criteria;
   import org.apache.arrow.flight.FlightClient;
   import org.apache.arrow.flight.FlightDescriptor;
   import org.apache.arrow.flight.FlightInfo;
   import org.apache.arrow.flight.FlightServer;
   import org.apache.arrow.flight.FlightStream;
   import org.apache.arrow.flight.Location;
   import org.apache.arrow.flight.Result;
   import org.apache.arrow.flight.Ticket;
   import org.apache.arrow.memory.BufferAllocator;
   import org.apache.arrow.memory.RootAllocator;
   import org.apache.arrow.vector.BigIntVector;
   import org.apache.arrow.vector.VectorSchemaRoot;
   import org.apache.arrow.vector.holders.NullableVarCharHolder;
   import org.apache.arrow.vector.types.pojo.ArrowType;
   import org.apache.arrow.vector.types.pojo.Field;
   import org.apache.arrow.vector.types.pojo.FieldType;
   import org.apache.arrow.vector.types.pojo.Schema;
   
   import java.io.IOException;
   import java.nio.charset.StandardCharsets;
   import java.util.Arrays;
   import java.util.Iterator;
   
   public class CookbookApp {
   
       public static void main(String[] args) {
   
           Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
           try (BufferAllocator allocator = new RootAllocator()) {
               // Server
               try (FlightServer flightServer = FlightServer.builder(allocator, location, new ArrowFlightProducer(allocator, location)).build()) {
                   try {
                       flightServer.start();
                       System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort());
                   } catch (IOException e) {
                       System.exit(1);
                   }
   
                   // Client
                   try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) {
                       System.out.println("C1: Client (Location): Connected to " + location.getUri());
   
                       // Populate data
                       Schema schema = new Schema(Arrays.asList(
                               new Field("name", new FieldType(false, new ArrowType.Int(64, true), null), null)));
   
                       try (
                               VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator);
                               BigIntVector names = (BigIntVector) vectorSchemaRoot.getVector("name")
                       ) {
                           FlightClient.ClientStreamListener listener =
                                   flightClient.startPut(
                                           FlightDescriptor.path("profiles"),
                                           vectorSchemaRoot,
                                           new AsyncPutListener()
                                   );
   
                           // Batch 1
                           int j = 0;
                           for (long i = 0; i < 10; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 2
                           j = 0;
                           for (long i = 10; i < 20; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 3
                           j = 0;
                           for (long i = 20; i < 30; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 4
                           j = 0;
                           for (long i = 30; i < 40; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 5
                           j = 0;
                           for (long i = 40; i < 50; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 6
                           j = 0;
                           for (long i = 50; i < 60; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           listener.completed();
                           listener.getResult();
                           
                           System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each");
                       }
   
                       // Get metadata information
                       FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles"));
                       System.out.println("C3: Client (Get Metadata): " + flightInfo);
   
                       // Get data information
                       try (FlightStream flightStream = flightClient.getStream(new Ticket(
                               FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)))) {
                           int batch = 0;
                           try (
                                   VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot();
                                   BigIntVector names = (BigIntVector) vectorSchemaRootReceived.getVector("name")
                           ) {
                               System.out.println("C4: Client (Get Stream):");
                               while (flightStream.next()) {
                                   batch++;
                                   System.out.println("Client Received batch #" + batch + ", Data:");
   //                                System.out.print(vectorSchemaRootReceived.contentToTSVString());
                                   int i = vectorSchemaRootReceived.getRowCount();
                                   System.out.println("vector size: " + i);
                                   int j = 0;
                                   while (j < i) {
                                       System.out.println(names.get(j));
   //                                    names.get(j);
   //                                    copy(vcHolder, tmpSB);
   //                                    System.out.println("name" + j + ": " + tmpSB);
                                       j++;
                                   }
                               }
                           }
                       } catch (Exception e) {
                           e.printStackTrace();
                       }
   
                       // Get all metadata information
                       Iterable<FlightInfo> flightInfosBefore = flightClient.listFlights(Criteria.ALL);
                       System.out.print("C5: Client (List Flights Info): ");
                       flightInfosBefore.forEach(t -> System.out.println(t));
   
                       // Do delete action
                       Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE",
                               FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)));
                       while (deleteActionResult.hasNext()) {
                           Result result = deleteActionResult.next();
                           System.out.println("C6: Client (Do Delete Action): " +
                                   new String(result.getBody(), StandardCharsets.UTF_8));
                       }
   
                       // Get all metadata information (to validate detele action)
                       Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
                       flightInfos.forEach(System.out::println);
                       System.out.println("C7: Client (List Flights Info): After delete - No records");
   
                       // Server shut down
                       flightServer.shutdown();
                       System.out.println("C8: Server shut down successfully");
                   }
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-cookbook] lidavidm closed issue #218: [Java] Cookbook Java example does not work as expected with multiple batches

Posted by GitBox <gi...@apache.org>.
lidavidm closed issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
URL: https://github.com/apache/arrow-cookbook/issues/218


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-cookbook] iamsmkr commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches

Posted by GitBox <gi...@apache.org>.
iamsmkr commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138657348

   @lidavidm What's wrong in th code above? How could I fix this in my code? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-cookbook] lidavidm commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches

Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138601029

   Oh wait, I misread the code. Let me take another look…


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-cookbook] lidavidm commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches

Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138668166

   I will put up a PR that fixes the server in the cookbook. The server is closing the ArrowRecordBatch in acceptPut when it should not be.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-cookbook] iamsmkr commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches

Posted by GitBox <gi...@apache.org>.
iamsmkr commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138666767

   Or did you mean the `Producer`?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-cookbook] iamsmkr commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches

Posted by GitBox <gi...@apache.org>.
iamsmkr commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138599787

   @lidavidm Thanks for your response! Could you please elaborate on your answer or point out the mistake in the code above. I believe I am already doing what you have suggested? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-cookbook] lidavidm commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches

Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138688141

   See https://github.com/apache/arrow-cookbook/pull/219


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-cookbook] iamsmkr commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches

Posted by GitBox <gi...@apache.org>.
iamsmkr commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138664719

   @lidavidm This is how I create the server. Anything suspicious here?
   
   ```java
   package com.iamsmkr.arrowflight;
   
   import org.apache.arrow.flight.FlightServer;
   import org.apache.arrow.flight.Location;
   import org.apache.arrow.memory.BufferAllocator;
   
   import java.io.IOException;
   import java.util.concurrent.ExecutorService;
   import java.util.concurrent.Executors;
   
   public class ArrowFlightServer {
   
       private final BufferAllocator allocator;
       private final FlightServer flightServer;
       volatile private boolean started = false;
   
       public ArrowFlightServer(String address, int port, BufferAllocator allocator) {
           Location location = Location.forGrpcInsecure(address, port);
           this.allocator = allocator;
           this.flightServer =
                   FlightServer.builder(
                           allocator,
                           location,
                           new ArrowFlightProducer(allocator, location)
                   ).build();
   
           ExecutorService service = Executors.newCachedThreadPool();
           service.submit(() -> {
               try {
                   synchronized (flightServer) {
                       flightServer.start();
                       started = true;
                       System.out.println("ArrowFlight server started. Listening on port " + flightServer.getPort());
                       flightServer.notify();
                   }
                   flightServer.awaitTermination();
               } catch (IOException e) {
                   System.out.println("Failed to start ArrowFlight server! " + e.getMessage());
                   e.printStackTrace();
               } catch (InterruptedException e) {
   //            e.printStackTrace();
               } finally {
                   close();
               }
           });
       }
   
       public void waitForServerToStart() {
           synchronized (flightServer) {
               while (!started) {
                   try {
                       flightServer.wait();
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               }
           }
       }
   
       public void close() {
           try {
               flightServer.shutdown();
               allocator.close();
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-cookbook] lidavidm commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches

Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1145122321

   @iamsmkr please try the new example server out and see how that works.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-cookbook] iamsmkr commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches

Posted by GitBox <gi...@apache.org>.
iamsmkr commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1145280274

   @lidavidm  Works just fine! Many thanks for your prompt fix. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-cookbook] lidavidm commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches

Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138658239

   I will put a PR up for the server soon. It doesn't appear to be the client, sorry.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-cookbook] lidavidm commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches

Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138600724

   Use indices 0 to 10 every time, not 0 to 10, 10 to 20, ... that is what I mean. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-cookbook] lidavidm commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches

Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138652539

   You are right, there is effectively a use-after-free in the example server. I'll have a PR up soon


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org