You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by bl...@apache.org on 2019/11/19 00:13:07 UTC
[avro] branch branch-1.9 updated: AVRO-2618: Support non-seekable
Streams in DataFileReader (#702)
This is an automated email from the ASF dual-hosted git repository.
blachniet pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/branch-1.9 by this push:
new 9e5289b AVRO-2618: Support non-seekable Streams in DataFileReader (#702)
9e5289b is described below
commit 9e5289b9fe7f66a0ed3481dc08c8f5933c1684db
Author: Sébastien Foutrier <se...@gmail.com>
AuthorDate: Tue Nov 19 01:11:31 2019 +0100
AVRO-2618: Support non-seekable Streams in DataFileReader (#702)
This commit allows to support non-seekable Stream as reading input in
avro DataFileReader. This allow not to force the user to fully download
a file from a network storage prior to read it.
When creating an DataFileReader with a non-seekable Stream, features as
Seek(), Sync(), PastSync(), PreviousSync() and Tell() can't be used.
(cherry picked commit from 3c10c361a788add7d7f27fb6102148d6106f2544)
---
lang/csharp/src/apache/main/File/DataFileReader.cs | 48 +++++++++++++---------
lang/csharp/src/apache/main/File/DataFileWriter.cs | 9 +++-
lang/csharp/src/apache/test/File/FileTests.cs | 48 ++++++++++++++++++++++
3 files changed, 83 insertions(+), 22 deletions(-)
diff --git a/lang/csharp/src/apache/main/File/DataFileReader.cs b/lang/csharp/src/apache/main/File/DataFileReader.cs
index d1f8452..b5f8c0d 100644
--- a/lang/csharp/src/apache/main/File/DataFileReader.cs
+++ b/lang/csharp/src/apache/main/File/DataFileReader.cs
@@ -106,22 +106,7 @@ namespace Avro.File
/// <returns>A new file reader</returns>
public static IFileReader<T> OpenReader(Stream inStream, Schema readerSchema, CreateDatumReader datumReaderFactory)
{
- if (!inStream.CanSeek)
- throw new AvroRuntimeException("Not a valid input stream - must be seekable!");
-
- if (inStream.Length < DataFileConstants.Magic.Length)
- throw new AvroRuntimeException("Not an Avro data file");
-
- // verify magic header
- byte[] magic = new byte[DataFileConstants.Magic.Length];
- inStream.Seek(0, SeekOrigin.Begin);
- for (int c = 0; c < magic.Length; c = inStream.Read(magic, c, magic.Length - c)) { }
- inStream.Seek(0, SeekOrigin.Begin);
-
- if (magic.SequenceEqual(DataFileConstants.Magic)) // current format
- return new DataFileReader<T>(inStream, readerSchema, datumReaderFactory); // (not supporting 1.2 or below, format)
-
- throw new AvroRuntimeException("Not an Avro data file");
+ return new DataFileReader<T>(inStream, readerSchema, datumReaderFactory); // (not supporting 1.2 or below, format)
}
DataFileReader(Stream stream, Schema readerSchema, CreateDatumReader datumReaderFactory)
@@ -191,6 +176,9 @@ namespace Avro.File
/// <inheritdoc/>
public void Seek(long position)
{
+ if (!_stream.CanSeek)
+ throw new AvroRuntimeException("Not a valid input stream - must be seekable!");
+
_stream.Position = position;
_decoder = new BinaryDecoder(_stream);
_datumDecoder = null;
@@ -245,6 +233,8 @@ namespace Avro.File
/// <inheritdoc/>
public long PreviousSync()
{
+ if (!_stream.CanSeek)
+ throw new AvroRuntimeException("Not a valid input stream - must be seekable!");
return _blockStart;
}
@@ -412,7 +402,8 @@ namespace Avro.File
private void BlockFinished()
{
- _blockStart = _stream.Position;
+ if (_stream.CanSeek)
+ _blockStart = _stream.Position;
}
private DataBlock NextRawBlock(DataBlock reuse)
@@ -460,10 +451,27 @@ namespace Avro.File
return true;
// check to ensure still data to read
- if (!DataLeft())
- return false;
+ if (_stream.CanSeek)
+ {
+ if (!DataLeft())
+ return false;
+
+ _blockRemaining = _decoder.ReadLong(); // read block count
+ }
+ else
+ {
+ // when the stream is not seekable, the only way to know if there is still
+ // some data to read is to reach the end and raise an AvroException here.
+ try
+ {
+ _blockRemaining = _decoder.ReadLong(); // read block count
+ }
+ catch(AvroException)
+ {
+ return false;
+ }
+ }
- _blockRemaining = _decoder.ReadLong(); // read block count
_blockSize = _decoder.ReadLong(); // read block size
if (_blockSize > System.Int32.MaxValue || _blockSize < 0)
{
diff --git a/lang/csharp/src/apache/main/File/DataFileWriter.cs b/lang/csharp/src/apache/main/File/DataFileWriter.cs
index 234f205..bb936d2 100644
--- a/lang/csharp/src/apache/main/File/DataFileWriter.cs
+++ b/lang/csharp/src/apache/main/File/DataFileWriter.cs
@@ -190,15 +190,20 @@ namespace Avro.File
public void Flush()
{
EnsureHeader();
- Sync();
+ SyncInternal();
}
/// <inheritdoc/>
public long Sync()
{
+ SyncInternal();
+ return _stream.Position;
+ }
+
+ private void SyncInternal()
+ {
AssertOpen();
WriteBlock();
- return _stream.Position;
}
/// <inheritdoc/>
diff --git a/lang/csharp/src/apache/test/File/FileTests.cs b/lang/csharp/src/apache/test/File/FileTests.cs
index 42e4d43..6b43e98 100644
--- a/lang/csharp/src/apache/test/File/FileTests.cs
+++ b/lang/csharp/src/apache/test/File/FileTests.cs
@@ -25,6 +25,7 @@ using Avro.Specific;
using System.Reflection;
using Avro.File;
using System.Linq;
+using System.IO.Compression;
namespace Avro.Test.File
{
@@ -199,6 +200,53 @@ namespace Avro.Test.File
}
/// <summary>
+ /// This test is a single test case of
+ /// <see cref="TestGenericData(string, object[], Codec.Type)"/> but introduces a
+ /// DeflateStream as it is a standard non-seekable Stream that has the same behavior as the
+ /// NetworkStream, which we should handle.
+ /// </summary>
+ [TestCase("{\"type\":\"record\", \"name\":\"n\", \"fields\":" +
+ "[{\"name\":\"f1\", \"type\":[\"int\", \"long\"]}]}",
+ new object[] { "f1", 100L }, Codec.Type.Null)]
+ public void TestNonSeekableStream(string schemaStr, object[] value, Codec.Type codecType)
+ {
+ foreach (var rwFactory in GenericOptions<GenericRecord>())
+ {
+ // Create and write out
+ MemoryStream compressedStream = new MemoryStream();
+ // using here a DeflateStream as it is a standard non-seekable stream, so if it works for this one,
+ // it should also works with any standard non-seekable stream (ie: NetworkStreams)
+ DeflateStream dataFileOutputStream = new DeflateStream(compressedStream, CompressionMode.Compress);
+ using (var writer = rwFactory.CreateWriter(dataFileOutputStream, Schema.Parse(schemaStr), Codec.CreateCodec(codecType)))
+ {
+ writer.Append(mkRecord(value, Schema.Parse(schemaStr) as RecordSchema));
+
+ // The Sync method is not supported for non-seekable streams.
+ Assert.Throws<NotSupportedException>(() => writer.Sync());
+ }
+
+ DeflateStream dataFileInputStream = new DeflateStream(new MemoryStream(compressedStream.ToArray()), CompressionMode.Decompress);
+
+ // Read back
+ IList<GenericRecord> readFoos = new List<GenericRecord>();
+ using (IFileReader<GenericRecord> reader = rwFactory.CreateReader(dataFileInputStream, null))
+ {
+ foreach (GenericRecord foo in reader.NextEntries)
+ {
+ readFoos.Add(foo);
+ }
+
+ // These methods are not supported for non-seekable streams.
+ Assert.Throws<AvroRuntimeException>(() => reader.Seek(0));
+ Assert.Throws<AvroRuntimeException>(() => reader.PreviousSync());
+ }
+
+ Assert.IsTrue((readFoos != null && readFoos.Count > 0),
+ string.Format(@"Generic object: {0} did not serialise/deserialise correctly", readFoos));
+ }
+ }
+
+ /// <summary>
/// Reading & writing of primitive objects
/// </summary>
/// <param name="schemaStr"></param>