You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@avro.apache.org by "Christophe Le Saec (Jira)" <ji...@apache.org> on 2023/06/12 12:46:00 UTC

[jira] [Commented] (AVRO-2916) add DataFileWriter.appendTo(Header,OutputStream) to be able to append data to non-local file

    [ https://issues.apache.org/jira/browse/AVRO-2916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17731593#comment-17731593 ] 

Christophe Le Saec commented on AVRO-2916:
------------------------------------------

For azure, couldn't you use "CloudBlockBlob.openOutputStream()" method to get direct output stream instead of using an intermediary byteArrayOutputStream ?
 
{code:java}
DatumWriter<User> writer = new SpecificDatumWriter<User>(User.getClassSchema());
try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer);
    OutputStream out = blob.openOutputStream()) {
  dataFileWriter.setCodec(CodecFactory.nullCodec());
  dataFileWriter.appendTo(in, out); // call only once

  for(;;) {
    User userToAppend = ...
    dataFileWriter.append(userToAppend);
  }
}
{code}
	
 

> add DataFileWriter.appendTo(Header,OutputStream) to be able to append data to non-local file
> --------------------------------------------------------------------------------------------
>
>                 Key: AVRO-2916
>                 URL: https://issues.apache.org/jira/browse/AVRO-2916
>             Project: Apache Avro
>          Issue Type: Improvement
>          Components: java
>    Affects Versions: 1.11.0
>            Reporter: Arnaud Nauwynck
>            Priority: Trivial
>              Labels: pull-request-available
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> It is not practical to append records to a remote DataFile (azure blob, aws, ..), not using java.io.File, but in-memory byte array to append to an existing remote data.
> The proposal is simply to add an equivalent method "DataFileWriter.appendTo(Header, OutputStream)" as follow:
> {code:java}
>   /**
>    * Open a writer appending to an existing stream.
>    *
>    * @param header  the header from the existing data to append.
>    * @param out positioned at the end of the existing file.
>    */
>   public DataFileWriter<D> appendTo(Header header, OutputStream out) throws IOException {
>     assertNotOpen();
>     this.schema = header.schema;
>     this.sync = header.sync;
>     this.meta.putAll(header.meta);
>     byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
>     if (codecBytes != null) {
>       String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
>       this.codec = CodecFactory.fromString(strCodec).createInstance();
>     } else {
>       this.codec = CodecFactory.nullCodec().createInstance();
>     }
>     init(out);
>     return this;
>   }
> {code} 
> in addition to the similar existing method:
>  {code:java}
>   public DataFileWriter<D> appendTo(SeekableInput in, OutputStream out) throws IOException {
>     assertNotOpen();
>     DataFileReader<D> reader = new DataFileReader<>(in, new GenericDatumReader<>());
>     this.schema = reader.getSchema();
>     this.sync = reader.getHeader().sync;
>     this.meta.putAll(reader.getHeader().meta);
>     byte[] codecBytes = this.meta.get(DataFileConstants.CODEC);
>     if (codecBytes != null) {
>       String strCodec = new String(codecBytes, StandardCharsets.UTF_8);
>       this.codec = CodecFactory.fromString(strCodec).createInstance();
>     } else {
>       this.codec = CodecFactory.nullCodec().createInstance();
>     }
>     init(out);
>     return this;
>   }
>  {code}
> Technically, we could call "DataFileWriter.appendTo(seekableInput, output)", but this is both complex and inneficient to pass the "seekableInput" fragment of an existing local file header.
>  {code:java}
> byte[] inArrayHeader = ... fetch once the header of a remote file...
> User userToAppend = ...
>  
> ByteArrayOutputStream out = new ByteArrayOutputStream();
> DatumWriter<User> writer = new SpecificDatumWriter<User>(User.getClassSchema());
> try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
> 	dataFileWriter.setCodec(CodecFactory.nullCodec());
> 	try (SeekableByteArrayInput in = new SeekableByteArrayInput(inArrayHeader)) {
> 	      dataFileWriter.appendTo(in, out);  // ... inneficient: will reparse header schema,sync,meta each time!
> 	}
> 				
> 	dataFileWriter.append(userToAppend);								
> }
> byte[] serializedBytes = out.toByteArray();
> // then use serializedBytes to append to remote file (azure blob, aws..) 
> {code}
> Using the new proposed helper method, you could more simply and efficiently compute datablock to append to a remote data file:
> {code:java}
> // run once at startup
> Header header;
> {
> 	SeekableByteArrayInput in = new SeekableByteArrayInput(inArrayHeader);
> 	DataFileReader<Object> reader = new DataFileReader<>(in, new GenericDatumReader<>());
> 	header = reader.getHeader();
> 	reader.close();
> }
> // streaming code to append+flush rows to remote data
> for(;;) {
>   User userToAppend = ...
>   ByteArrayOutputStream out = new ByteArrayOutputStream();
>   DatumWriter<User> writer = new SpecificDatumWriter<User>(User.getClassSchema());
>   try (DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(writer)) {
> 	dataFileWriter.setCodec(CodecFactory.nullCodec());
> 	dataFileWriter.appendTo(header, out); // efficient: no reparse schema,sync,meta
> 				
> 	dataFileWriter.append(userToAppend);
>   }
>   byte[] serializedBytes = out.toByteArray();
>   // then use serializedBytes to append to remote file (azure blob, aws..) 
>   ...  remoteAzureFile.append(.. serializedBytes) .. remoteAzureFile.flush()
> }		
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)