You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/26 12:28:22 UTC

[GitHub] [arrow] iamsmkr opened a new issue, #13242: Cookbook Java example does not work as expected with multiple batches

iamsmkr opened a new issue, #13242:
URL: https://github.com/apache/arrow/issues/13242

   I am trying out the cookbook java example [here](https://arrow.apache.org/cookbook/java/flight.html). The only change is that I am trying to write multiple batches. See "batch" comment in the code.
   
   Upon running this example I am seeing unexpected **overlapping results**!! This thing gets wierder with multi-threading.
   Please suggest what is the correct way of sending multiple batches! 
   
   ```
   S1: Server (Location): Listening on port 33333
   C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333
   WARNING: An illegal reflective access operation has occurred
   WARNING: Illegal reflective access by org.apache.arrow.memory.util.MemoryUtil (file:/Users/rentsher/.m2/repository/org/apache/arrow/arrow-memory-core/8.0.0/arrow-memory-core-8.0.0.jar) to field java.nio.Buffer.address
   WARNING: Please consider reporting this to the maintainers of org.apache.arrow.memory.util.MemoryUtil
   WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
   WARNING: All illegal access operations will be denied in a future release
   C2: Client (Populate Data): Wrote 2 batches with 3 rows each
   C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Int(64, true) not null>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=60}
   C4: Client (Get Stream):
   Client Received batch #1, Data:
   vector size: 10
   30
   31
   32
   33
   34
   35
   36
   37
   38
   39
   Client Received batch #2, Data:
   vector size: 10
   40
   41
   42
   43
   44
   45
   46
   47
   48
   49
   Client Received batch #3, Data:
   vector size: 10
   50
   51
   52
   53
   54
   55
   56
   57
   58
   59
   Client Received batch #4, Data:
   vector size: 10
   30
   31
   32
   33
   34
   35
   36
   37
   38
   39
   Client Received batch #5, Data:
   vector size: 10
   40
   41
   42
   43
   44
   45
   46
   47
   48
   49
   Client Received batch #6, Data:
   vector size: 10
   50
   51
   52
   53
   54
   55
   56
   57
   58
   59
   C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Int(64, true) not null>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=60}
   C6: Client (Do Delete Action): Delete completed
   C7: Client (List Flights Info): After delete - No records
   C8: Server shut down successfully
   
   Process finished with exit code 0
   ```
   
   ```java
   package com.iamsmkr.arrowflight;
   
   import org.apache.arrow.flight.Action;
   import org.apache.arrow.flight.AsyncPutListener;
   import org.apache.arrow.flight.Criteria;
   import org.apache.arrow.flight.FlightClient;
   import org.apache.arrow.flight.FlightDescriptor;
   import org.apache.arrow.flight.FlightInfo;
   import org.apache.arrow.flight.FlightServer;
   import org.apache.arrow.flight.FlightStream;
   import org.apache.arrow.flight.Location;
   import org.apache.arrow.flight.Result;
   import org.apache.arrow.flight.Ticket;
   import org.apache.arrow.memory.BufferAllocator;
   import org.apache.arrow.memory.RootAllocator;
   import org.apache.arrow.vector.BigIntVector;
   import org.apache.arrow.vector.VectorSchemaRoot;
   import org.apache.arrow.vector.holders.NullableVarCharHolder;
   import org.apache.arrow.vector.types.pojo.ArrowType;
   import org.apache.arrow.vector.types.pojo.Field;
   import org.apache.arrow.vector.types.pojo.FieldType;
   import org.apache.arrow.vector.types.pojo.Schema;
   
   import java.io.IOException;
   import java.nio.charset.StandardCharsets;
   import java.util.Arrays;
   import java.util.Iterator;
   
   public class CookbookApp {
   
       public static void main(String[] args) {
   
           Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
           try (BufferAllocator allocator = new RootAllocator()) {
               // Server
               try (FlightServer flightServer = FlightServer.builder(allocator, location, new ArrowFlightProducer(allocator, location)).build()) {
                   try {
                       flightServer.start();
                       System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort());
                   } catch (IOException e) {
                       System.exit(1);
                   }
   
                   // Client
                   try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) {
                       System.out.println("C1: Client (Location): Connected to " + location.getUri());
   
                       // Populate data
                       Schema schema = new Schema(Arrays.asList(
                               new Field("name", new FieldType(false, new ArrowType.Int(64, true), null), null)));
   
                       try (
                               VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator);
                               BigIntVector names = (BigIntVector) vectorSchemaRoot.getVector("name")
                       ) {
                           FlightClient.ClientStreamListener listener =
                                   flightClient.startPut(
                                           FlightDescriptor.path("profiles"),
                                           vectorSchemaRoot,
                                           new AsyncPutListener()
                                   );
   
                           // Batch 1
                           int j = 0;
                           for (long i = 0; i < 10; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 2
                           j = 0;
                           for (long i = 10; i < 20; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 3
                           j = 0;
                           for (long i = 20; i < 30; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 4
                           j = 0;
                           for (long i = 30; i < 40; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 5
                           j = 0;
                           for (long i = 40; i < 50; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           // Batch 6
                           j = 0;
                           for (long i = 50; i < 60; i++) {
                               names.setSafe(j, i);
                               j++;
                           }
                           vectorSchemaRoot.setRowCount(10);
   
                           while (!listener.isReady()) {
                               try {
                                   Thread.sleep(1);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                           }
   
                           listener.putNext();
   
                           listener.completed();
                           listener.getResult();
                           
                           System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each");
                       }
   
                       // Get metadata information
                       FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles"));
                       System.out.println("C3: Client (Get Metadata): " + flightInfo);
   
                       // Get data information
                       try (FlightStream flightStream = flightClient.getStream(new Ticket(
                               FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)))) {
                           int batch = 0;
                           try (
                                   VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot();
                                   BigIntVector names = (BigIntVector) vectorSchemaRootReceived.getVector("name")
                           ) {
                               System.out.println("C4: Client (Get Stream):");
                               while (flightStream.next()) {
                                   batch++;
                                   System.out.println("Client Received batch #" + batch + ", Data:");
   //                                System.out.print(vectorSchemaRootReceived.contentToTSVString());
                                   int i = vectorSchemaRootReceived.getRowCount();
                                   System.out.println("vector size: " + i);
                                   int j = 0;
                                   while (j < i) {
                                       System.out.println(names.get(j));
   //                                    names.get(j);
   //                                    copy(vcHolder, tmpSB);
   //                                    System.out.println("name" + j + ": " + tmpSB);
                                       j++;
                                   }
                               }
                           }
                       } catch (Exception e) {
                           e.printStackTrace();
                       }
   
                       // Get all metadata information
                       Iterable<FlightInfo> flightInfosBefore = flightClient.listFlights(Criteria.ALL);
                       System.out.print("C5: Client (List Flights Info): ");
                       flightInfosBefore.forEach(t -> System.out.println(t));
   
                       // Do delete action
                       Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE",
                               FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)));
                       while (deleteActionResult.hasNext()) {
                           Result result = deleteActionResult.next();
                           System.out.println("C6: Client (Do Delete Action): " +
                                   new String(result.getBody(), StandardCharsets.UTF_8));
                       }
   
                       // Get all metadata information (to validate detele action)
                       Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
                       flightInfos.forEach(System.out::println);
                       System.out.println("C7: Client (List Flights Info): After delete - No records");
   
                       // Server shut down
                       flightServer.shutdown();
                       System.out.println("C8: Server shut down successfully");
                   }
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on issue #13242: Cookbook Java example does not work as expected with multiple batches

Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #13242:
URL: https://github.com/apache/arrow/issues/13242#issuecomment-1138533992

   VectorSchemaRoots are mutable. So you should be reusing the indices each time, not using higher indices for each new batch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org