You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@avro.apache.org by "Victor (Jira)" <ji...@apache.org> on 2023/06/09 06:56:00 UTC

[jira] [Comment Edited] (AVRO-3768) Provide InputStream implementation that wraps an Iterator of Record

    [ https://issues.apache.org/jira/browse/AVRO-3768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730633#comment-17730633 ] 

Victor edited comment on AVRO-3768 at 6/9/23 6:55 AM:
------------------------------------------------------

Hi Christophe, thank you for looking into my use case :)

I think it is a bit simpler than this, I will directly share my exact need, I may have abstracted it a bit too much in the ticket description.

I have a system that can receive avro records via http and that deserialize them using the DataFileStream available from the avro library to put them in a db. This is all working perfectly well.

Now I want to write a test to verify it's working as expected without exploding the memory and it is as fast as I want it, so in my test, I want to be able to write something like this:
{code:java}
Schema recordSchema = SchemaBuilder.record("Row").fields().name("value").type().intType().endRecord();
// this could be anything we want (e.g., a Stream, an Iterator, or maybe an avro-provided interface),
// the only important thing is that it generates record one by one and not everything at once because there are a lot of them
Iterator<GenericData.Record> iterator = Stream.generate(() -> {
  var record = new GenericData.Record(recordSchema);
  record.put("value", Random.nextInt(100));
  return record;
}).limit(10_000_000).iterator();
httpClient.post(URL, new AvroInputStream(iterator));
{code}
Basically, I would like to have AvroInputStream that when its read() is called, it will take a record, serialize it in a buffer in memory and return the bytes and so on until there is no more record to read.

Right now, I am forced to write this:
{code:java}
var pipedOutput = new PipedOutputStream();
var pipedInput = new PipedInputStream(pipedOutput);

new Thread(() -> {
    try (var w = new DataFileWriter<>(new GenericDatumWriter<>(recordSchema)).create(recordSchema, pipedOutput)) {
        while (iterator.hasNext()) {
            it.append(rows.next())
        }
    }
}).start();

httpClient.post(URL, pipedInput);{code}
Which is a bit more complex in terms of code and also is using more resources than needed because theoretically we don't really need a separate thread like this.

I hope this makes sense :)


was (Author: victor.noel):
Hi Christophe, thank you for looking into my use case :)

I think it is a bit simpler than this, I will directly share my exact need, I may have abstracted it a bit too much in the ticket description.

I have a system that can receive avro records via http and that deserialize them using the DataFileStream available from the avro library to put them in a db. This is all working perfectly well.

Now I want to write a test to verify it's working as expected without exploding the memory and it is as fast as I want it, so in my test, I want to be able to write something like this:
{code:java}
Schema recordSchema = SchemaBuilder.record("Row").fields().name("value").type().intType().endRecord();
// this could be anything we want (e.g., a Stream, an Iterator, or maybe an avro-provided interface),
// the only important thing is that it generates record one by one and not everything at once because there are a lot of them
Iterator<GenericData.Record> iterator = Stream.generate(() -> {
  var record = new GenericData.Record(recordSchema);
  record.put("value", Random.nextInt(100));
  return record;
}).limit(10_000_000).iterator();
httpClient.post(URL, new AvroInputStream(iterator));
{code}
Basically, I would like to have AvroInputStream that when its read() is called, it will take a record, serialize it in a buffer in memory and send it and so on until there is no more record to read.

Right now, I am forced to write this:
{code:java}
var pipedOutput = new PipedOutputStream();
var pipedInput = new PipedInputStream(pipedOutput);

new Thread(() -> {
    try (var w = new DataFileWriter<>(new GenericDatumWriter<>(recordSchema)).create(recordSchema, pipedOutput)) {
        while (iterator.hasNext()) {
            it.append(rows.next())
        }
    }
}).start();

httpClient.post(URL, pipedInput);{code}
Which is a bit more complex in terms of code and also is using more resources than needed because theoretically we don't really need a separate thread like this.

I hope this makes sense :)

> Provide InputStream implementation that wraps an Iterator of Record
> -------------------------------------------------------------------
>
>                 Key: AVRO-3768
>                 URL: https://issues.apache.org/jira/browse/AVRO-3768
>             Project: Apache Avro
>          Issue Type: New Feature
>          Components: java
>            Reporter: Victor
>            Priority: Major
>
> Hello,
> I have some code that generates avro record (using GenericData.Record precisely) record per record (I used an Iterator<GenericData.Record> in practice, but we could imagine anything similar including an actual java.util.Stream or even an avro-provided interface), and I would like to serialized it to some external system (an http request in my particular case). So basically the data is generated as it is pulled.
> Right now, the only option I found is to use a combination of java.io.PipedInputStream and PipedOutputStream, wrapping the later inside a DataFileWriter and then feeding the records to the writer (in a separate thread) so that the PipedInputStream can be read by anything else.
> As you can see this is a bit cumbersome and a more straightforward approach would be welcomed. I tried to implement this myself but I admit I got lost in all the moving pieces and I couldn't find a simple way of doing that without implementing a lot of low-level stuff. So maybe I'm missing something or we could add something to avro :)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)