You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@avro.apache.org by "Christophe Le Saec (Jira)" <ji...@apache.org> on 2023/06/12 12:46:00 UTC
[jira] [Commented] (AVRO-2916) add DataFileWriter.appendTo(Header,OutputStream) to be able to append data to non-local file
[ https://issues.apache.org/jira/browse/AVRO-2916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17731593#comment-17731593 ]
Christophe Le Saec commented on AVRO-2916:
------------------------------------------
For azure, couldn't you use "CloudBlockBlob.openOutputStream()" method to get direct output stream instead of using an intermediary byteArrayOutputStream ?
{code:java}
DatumWriter<User> writer = new SpecificDatumWriter<User>(User.getClassSchema());
try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer);
OutputStream out = blob.openOutputStream()) {
dataFileWriter.setCodec(CodecFactory.nullCodec());
dataFileWriter.appendTo(in, out); // call only once
for(;;) {
User userToAppend = ...
dataFileWriter.append(userToAppend);
}
}
{code}
> add DataFileWriter.appendTo(Header,OutputStream) to be able to append data to non-local file
> --------------------------------------------------------------------------------------------
>
> Key: AVRO-2916
> URL: https://issues.apache.org/jira/browse/AVRO-2916
> Project: Apache Avro
> Issue Type: Improvement
> Components: java
> Affects Versions: 1.11.0
> Reporter: Arnaud Nauwynck
> Priority: Trivial
> Labels: pull-request-available
> Time Spent: 50m
> Remaining Estimate: 0h
>
> It is not practical to append records to a remote DataFile (azure blob, aws, ..), not using java.io.File, but in-memory byte array to append to an existing remote data.
> The proposal is simply to add an equivalent method "DataFileWriter.appendTo(Header, OutputStream)" as follow:
> {code:java}
> /**
> * Open a writer appending to an existing stream.
> *
> * @param header the header from the existing data to append.
> * @param out positioned at the end of the existing file.
> */
> public DataFileWriter<D> appendTo(Header header, OutputStream out) throws IOException {
> assertNotOpen();
> this.schema = header.schema;
> this.sync = header.sync;
> this.meta.putAll(header.meta);
> byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
> if (codecBytes != null) {
> String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
> this.codec = CodecFactory.fromString(strCodec).createInstance();
> } else {
> this.codec = CodecFactory.nullCodec().createInstance();
> }
> init(out);
> return this;
> }
> {code}
> in addition to the similar existing method:
> {code:java}
> public DataFileWriter<D> appendTo(SeekableInput in, OutputStream out) throws IOException {
> assertNotOpen();
> DataFileReader<D> reader = new DataFileReader<>(in, new GenericDatumReader<>());
> this.schema = reader.getSchema();
> this.sync = reader.getHeader().sync;
> this.meta.putAll(reader.getHeader().meta);
> byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
> if (codecBytes != null) {
> String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
> this.codec = CodecFactory.fromString(strCodec).createInstance();
> } else {
> this.codec = CodecFactory.nullCodec().createInstance();
> }
> init(out);
> return this;
> }
> {code}
> Technically, we could call "DataFileWriter.appendTo(seekableInput, output)", but this is both complex and inneficient to pass the "seekableInput" fragment of an existing local file header.
> {code:java}
> byte[] inArrayHeader = ... fetch once the header of a remote file...
> User userToAppend = ...
>
> ByteArrayOutputStream out = new ByteArrayOutputStream();
> DatumWriter<User> writer = new SpecificDatumWriter<User>(User.getClassSchema());
> try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
> dataFileWriter.setCodec(CodecFactory.nullCodec());
> try (SeekableByteArrayInput in = new SeekableByteArrayInput(inArrayHeader)) {
> dataFileWriter.appendTo(in, out); // ... inneficient: will reparse header schema,sync,meta each time!
> }
>
> dataFileWriter.append(userToAppend);
> }
> byte[] serializedBytes = out.toByteArray();
> // then use serializedBytes to append to remote file (azure blob, aws..)
> {code}
> Using the new proposed helper method, you could more simply and efficiently compute datablock to append to a remote data file:
> {code:java}
> // run once at startup
> Header header;
> {
> SeekableByteArrayInput in = new SeekableByteArrayInput(inArrayHeader);
> DataFileReader<Object> reader = new DataFileReader<>(in, new GenericDatumReader<>());
> header = reader.getHeader();
> reader.close();
> }
> // streaming code to append+flush rows to remote data
> for(;;) {
> User userToAppend = ...
> ByteArrayOutputStream out = new ByteArrayOutputStream();
> DatumWriter<User> writer = new SpecificDatumWriter<User>(User.getClassSchema());
> try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
> dataFileWriter.setCodec(CodecFactory.nullCodec());
> dataFileWriter.appendTo(header, out); // efficient: no reparse schema,sync,meta
>
> dataFileWriter.append(userToAppend);
> }
> byte[] serializedBytes = out.toByteArray();
> // then use serializedBytes to append to remote file (azure blob, aws..)
> ... remoteAzureFile.append(.. serializedBytes) .. remoteAzureFile.flush()
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)