You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@avro.apache.org by "Asen Milchev Kolev (Jira)" <ji...@apache.org> on 2019/08/20 08:45:00 UTC
[jira] [Created] (AVRO-2511) Avro Java DataFileWriter Flush() does
not flush the buffer to disk
Asen Milchev Kolev created AVRO-2511:
----------------------------------------
Summary: Avro Java DataFileWriter Flush() does not flush the buffer to disk
Key: AVRO-2511
URL: https://issues.apache.org/jira/browse/AVRO-2511
Project: Apache Avro
Issue Type: Bug
Components: java
Affects Versions: 1.9.0
Reporter: Asen Milchev Kolev
If you try to use flush() with output stream instead of a file, the buffer is not flushed to disk. Here is an example how I'm using it and there is no chance to see data flushed to disk! Is that by design or it is a bug? I really need this in order to determine file size and create a new one when max file size is reached!
{code:java}
........
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<>()); DataFileStream<GenericRecord> dataFileStreamReader = new DataFileStream<>(is, new GenericDatumReader<GenericRecord>())) { dataFileWriter.setFlushOnEveryBlock(true); FSDataOutputStream hdfsOutputStream = null; dataFileWriter.setCodec(codecFactory); Schema schema = dataFileStreamReader.getSchema(); if (fileMode.equals(FileMode.APPEND)) { FileContext fc = FileContext.getFileContext(hdfsConfiguration); hdfsOutputStream = fileSystem.append(hdfsPath); dataFileWriter.appendTo(new AvroFSInput(fc, hdfsPath), hdfsOutputStream); } else { hdfsOutputStream = fileSystem.create(hdfsPath); fileManager.setCreationTime(hdfsPath); dataFileWriter.create(schema, hdfsOutputStream); }
GenericRecord genericRecord = null; while (dataFileStreamReader.hasNext()) { if (fileManager.isLimitsReached()) { IOUtils.closeStream(dataFileWriter); fileSystem.rename(hdfsPath, fileManager.getFinalPath(hdfsPath)); LOG.info("Avro write completed for {0}", hdfsPath.toString()); hdfsPath = fileManager.getPath(); storePaths.add(hdfsPath); hdfsOutputStream = fileSystem.create(hdfsPath); fileManager.setCreationTime(hdfsPath); dataFileWriter.setCodec(codecFactory); dataFileWriter.create(schema, hdfsOutputStream); LOG.info("Initiate Avro write to {0}", hdfsPath.toString()); } genericRecord = dataFileStreamReader.next(genericRecord); dataFileWriter.append(genericRecord); dataFileWriter.flush();// doesn't work at all when we are using streams and not directly files! fileManager.updateEntryCount(hdfsPath); }
{code}
--
This message was sent by Atlassian Jira
(v8.3.2#803003)