You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@avro.apache.org by "Asen Milchev Kolev (Jira)" <ji...@apache.org> on 2019/08/20 08:45:00 UTC

[jira] [Created] (AVRO-2511) Avro Java DataFileWriter Flush() does not flush the buffer to disk

Asen Milchev Kolev created AVRO-2511:
----------------------------------------

             Summary: Avro Java DataFileWriter Flush() does not flush the buffer to disk
                 Key: AVRO-2511
                 URL: https://issues.apache.org/jira/browse/AVRO-2511
             Project: Apache Avro
          Issue Type: Bug
          Components: java
    Affects Versions: 1.9.0
            Reporter: Asen Milchev Kolev


If you try to use flush() with output stream instead of a file, the buffer is not flushed to disk. Here is an example how I'm using it and there is no chance to see data flushed to disk! Is that by design or it is a bug? I really need this in order to determine file size and create a new one when max file size is reached!
{code:java}
........
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<>());                     DataFileStream<GenericRecord> dataFileStreamReader = new DataFileStream<>(is, new GenericDatumReader<GenericRecord>()))                {                    dataFileWriter.setFlushOnEveryBlock(true);                    FSDataOutputStream hdfsOutputStream = null;                    dataFileWriter.setCodec(codecFactory);                    Schema schema = dataFileStreamReader.getSchema();                    if (fileMode.equals(FileMode.APPEND))                    {                        FileContext fc = FileContext.getFileContext(hdfsConfiguration);                        hdfsOutputStream = fileSystem.append(hdfsPath);                        dataFileWriter.appendTo(new AvroFSInput(fc, hdfsPath), hdfsOutputStream);                    }                    else                    {                        hdfsOutputStream = fileSystem.create(hdfsPath);                        fileManager.setCreationTime(hdfsPath);                        dataFileWriter.create(schema, hdfsOutputStream);                    }
                    GenericRecord genericRecord = null;                    while (dataFileStreamReader.hasNext())                    {                        if (fileManager.isLimitsReached())                        {                            IOUtils.closeStream(dataFileWriter);                            fileSystem.rename(hdfsPath, fileManager.getFinalPath(hdfsPath));                            LOG.info("Avro write completed for {0}", hdfsPath.toString());                            hdfsPath = fileManager.getPath();                            storePaths.add(hdfsPath);                            hdfsOutputStream = fileSystem.create(hdfsPath);                            fileManager.setCreationTime(hdfsPath);                            dataFileWriter.setCodec(codecFactory);                            dataFileWriter.create(schema, hdfsOutputStream);                            LOG.info("Initiate Avro write to {0}", hdfsPath.toString());                        }                        genericRecord = dataFileStreamReader.next(genericRecord);                        dataFileWriter.append(genericRecord); dataFileWriter.flush();// doesn't work at all when we are using streams and not directly files!                        fileManager.updateEntryCount(hdfsPath);                    }
{code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)