You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/26 12:39:22 UTC
[GitHub] [arrow-cookbook] iamsmkr opened a new issue, #218: Cookbook Java example does not work as expected with multiple batches
iamsmkr opened a new issue, #218:
URL: https://github.com/apache/arrow-cookbook/issues/218
I am trying out the cookbook java example [here](https://arrow.apache.org/cookbook/java/flight.html). The only change is that I am trying to write multiple batches. See "batch" comment in the code.
Upon running this example I am seeing unexpected **overlapping results**!! This thing gets wierder with multi-threading.
Please suggest what is the correct way of sending multiple batches!
```
S1: Server (Location): Listening on port 33333
C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.arrow.memory.util.MemoryUtil (file:/Users/rentsher/.m2/repository/org/apache/arrow/arrow-memory-core/8.0.0/arrow-memory-core-8.0.0.jar) to field java.nio.Buffer.address
WARNING: Please consider reporting this to the maintainers of org.apache.arrow.memory.util.MemoryUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
C2: Client (Populate Data): Wrote 2 batches with 3 rows each
C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Int(64, true) not null>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=60}
C4: Client (Get Stream):
Client Received batch apache/arrow#1, Data:
vector size: 10
30
31
32
33
34
35
36
37
38
39
Client Received batch apache/arrow#2, Data:
vector size: 10
40
41
42
43
44
45
46
47
48
49
Client Received batch apache/arrow#3, Data:
vector size: 10
50
51
52
53
54
55
56
57
58
59
Client Received batch apache/arrow#4, Data:
vector size: 10
30
31
32
33
34
35
36
37
38
39
Client Received batch apache/arrow#5, Data:
vector size: 10
40
41
42
43
44
45
46
47
48
49
Client Received batch apache/arrow#6, Data:
vector size: 10
50
51
52
53
54
55
56
57
58
59
C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Int(64, true) not null>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=60}
C6: Client (Do Delete Action): Delete completed
C7: Client (List Flights Info): After delete - No records
C8: Server shut down successfully
Process finished with exit code 0
```
```java
package com.iamsmkr.arrowflight;
import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.AsyncPutListener;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.holders.NullableVarCharHolder;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Iterator;
public class CookbookApp {
public static void main(String[] args) {
Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
try (BufferAllocator allocator = new RootAllocator()) {
// Server
try (FlightServer flightServer = FlightServer.builder(allocator, location, new ArrowFlightProducer(allocator, location)).build()) {
try {
flightServer.start();
System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort());
} catch (IOException e) {
System.exit(1);
}
// Client
try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) {
System.out.println("C1: Client (Location): Connected to " + location.getUri());
// Populate data
Schema schema = new Schema(Arrays.asList(
new Field("name", new FieldType(false, new ArrowType.Int(64, true), null), null)));
try (
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator);
BigIntVector names = (BigIntVector) vectorSchemaRoot.getVector("name")
) {
FlightClient.ClientStreamListener listener =
flightClient.startPut(
FlightDescriptor.path("profiles"),
vectorSchemaRoot,
new AsyncPutListener()
);
// Batch 1
int j = 0;
for (long i = 0; i < 10; i++) {
names.setSafe(j, i);
j++;
}
vectorSchemaRoot.setRowCount(10);
while (!listener.isReady()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
listener.putNext();
// Batch 2
j = 0;
for (long i = 10; i < 20; i++) {
names.setSafe(j, i);
j++;
}
vectorSchemaRoot.setRowCount(10);
while (!listener.isReady()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
listener.putNext();
// Batch 3
j = 0;
for (long i = 20; i < 30; i++) {
names.setSafe(j, i);
j++;
}
vectorSchemaRoot.setRowCount(10);
while (!listener.isReady()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
listener.putNext();
// Batch 4
j = 0;
for (long i = 30; i < 40; i++) {
names.setSafe(j, i);
j++;
}
vectorSchemaRoot.setRowCount(10);
while (!listener.isReady()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
listener.putNext();
// Batch 5
j = 0;
for (long i = 40; i < 50; i++) {
names.setSafe(j, i);
j++;
}
vectorSchemaRoot.setRowCount(10);
while (!listener.isReady()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
listener.putNext();
// Batch 6
j = 0;
for (long i = 50; i < 60; i++) {
names.setSafe(j, i);
j++;
}
vectorSchemaRoot.setRowCount(10);
while (!listener.isReady()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
listener.putNext();
listener.completed();
listener.getResult();
System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each");
}
// Get metadata information
FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles"));
System.out.println("C3: Client (Get Metadata): " + flightInfo);
// Get data information
try (FlightStream flightStream = flightClient.getStream(new Ticket(
FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)))) {
int batch = 0;
try (
VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot();
BigIntVector names = (BigIntVector) vectorSchemaRootReceived.getVector("name")
) {
System.out.println("C4: Client (Get Stream):");
while (flightStream.next()) {
batch++;
System.out.println("Client Received batch #" + batch + ", Data:");
// System.out.print(vectorSchemaRootReceived.contentToTSVString());
int i = vectorSchemaRootReceived.getRowCount();
System.out.println("vector size: " + i);
int j = 0;
while (j < i) {
System.out.println(names.get(j));
// names.get(j);
// copy(vcHolder, tmpSB);
// System.out.println("name" + j + ": " + tmpSB);
j++;
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
// Get all metadata information
Iterable<FlightInfo> flightInfosBefore = flightClient.listFlights(Criteria.ALL);
System.out.print("C5: Client (List Flights Info): ");
flightInfosBefore.forEach(t -> System.out.println(t));
// Do delete action
Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE",
FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)));
while (deleteActionResult.hasNext()) {
Result result = deleteActionResult.next();
System.out.println("C6: Client (Do Delete Action): " +
new String(result.getBody(), StandardCharsets.UTF_8));
}
// Get all metadata information (to validate detele action)
Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
flightInfos.forEach(System.out::println);
System.out.println("C7: Client (List Flights Info): After delete - No records");
// Server shut down
flightServer.shutdown();
System.out.println("C8: Server shut down successfully");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-cookbook] lidavidm closed issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
Posted by GitBox <gi...@apache.org>.
lidavidm closed issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
URL: https://github.com/apache/arrow-cookbook/issues/218
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-cookbook] iamsmkr commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
Posted by GitBox <gi...@apache.org>.
iamsmkr commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138657348
@lidavidm What's wrong in th code above? How could I fix this in my code?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-cookbook] lidavidm commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138601029
Oh wait, I misread the code. Let me take another look…
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-cookbook] lidavidm commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138668166
I will put up a PR that fixes the server in the cookbook. The server is closing the ArrowRecordBatch in acceptPut when it should not be.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-cookbook] iamsmkr commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
Posted by GitBox <gi...@apache.org>.
iamsmkr commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138666767
Or did you mean the `Producer`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-cookbook] iamsmkr commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
Posted by GitBox <gi...@apache.org>.
iamsmkr commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138599787
@lidavidm Thanks for your response! Could you please elaborate on your answer or point out the mistake in the code above. I believe I am already doing what you have suggested?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-cookbook] lidavidm commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138688141
See https://github.com/apache/arrow-cookbook/pull/219
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-cookbook] iamsmkr commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
Posted by GitBox <gi...@apache.org>.
iamsmkr commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138664719
@lidavidm This is how I create the server. Anything suspicious here?
```java
package com.iamsmkr.arrowflight;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.BufferAllocator;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ArrowFlightServer {
private final BufferAllocator allocator;
private final FlightServer flightServer;
volatile private boolean started = false;
public ArrowFlightServer(String address, int port, BufferAllocator allocator) {
Location location = Location.forGrpcInsecure(address, port);
this.allocator = allocator;
this.flightServer =
FlightServer.builder(
allocator,
location,
new ArrowFlightProducer(allocator, location)
).build();
ExecutorService service = Executors.newCachedThreadPool();
service.submit(() -> {
try {
synchronized (flightServer) {
flightServer.start();
started = true;
System.out.println("ArrowFlight server started. Listening on port " + flightServer.getPort());
flightServer.notify();
}
flightServer.awaitTermination();
} catch (IOException e) {
System.out.println("Failed to start ArrowFlight server! " + e.getMessage());
e.printStackTrace();
} catch (InterruptedException e) {
// e.printStackTrace();
} finally {
close();
}
});
}
public void waitForServerToStart() {
synchronized (flightServer) {
while (!started) {
try {
flightServer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void close() {
try {
flightServer.shutdown();
allocator.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-cookbook] lidavidm commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1145122321
@iamsmkr please try the new example server out and see how that works.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-cookbook] iamsmkr commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
Posted by GitBox <gi...@apache.org>.
iamsmkr commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1145280274
@lidavidm Works just fine! Many thanks for your prompt fix.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-cookbook] lidavidm commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138658239
I will put a PR up for the server soon. It doesn't appear to be the client, sorry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-cookbook] lidavidm commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138600724
Use indices 0 to 10 every time, not 0 to 10, 10 to 20, ... that is what I mean.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-cookbook] lidavidm commented on issue #218: [Java] Cookbook Java example does not work as expected with multiple batches
Posted by GitBox <gi...@apache.org>.
lidavidm commented on issue #218:
URL: https://github.com/apache/arrow-cookbook/issues/218#issuecomment-1138652539
You are right, there is effectively a use-after-free in the example server. I'll have a PR up soon
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org