You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by bl...@apache.org on 2019/11/19 00:13:07 UTC

[avro] branch branch-1.9 updated: AVRO-2618: Support non-seekable Streams in DataFileReader (#702)

This is an automated email from the ASF dual-hosted git repository.

blachniet pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new 9e5289b  AVRO-2618: Support non-seekable Streams in DataFileReader (#702)
9e5289b is described below

commit 9e5289b9fe7f66a0ed3481dc08c8f5933c1684db
Author: Sébastien Foutrier <se...@gmail.com>
AuthorDate: Tue Nov 19 01:11:31 2019 +0100

    AVRO-2618: Support non-seekable Streams in DataFileReader (#702)
    
    This commit allows to support non-seekable Stream as reading input in
    avro DataFileReader. This allow not to force the user to fully download
    a file from a network storage prior to read it.
    When creating an DataFileReader with a non-seekable Stream, features as
    Seek(), Sync(), PastSync(), PreviousSync() and Tell() can't be used.
    
    (cherry picked commit from 3c10c361a788add7d7f27fb6102148d6106f2544)
---
 lang/csharp/src/apache/main/File/DataFileReader.cs | 48 +++++++++++++---------
 lang/csharp/src/apache/main/File/DataFileWriter.cs |  9 +++-
 lang/csharp/src/apache/test/File/FileTests.cs      | 48 ++++++++++++++++++++++
 3 files changed, 83 insertions(+), 22 deletions(-)

diff --git a/lang/csharp/src/apache/main/File/DataFileReader.cs b/lang/csharp/src/apache/main/File/DataFileReader.cs
index d1f8452..b5f8c0d 100644
--- a/lang/csharp/src/apache/main/File/DataFileReader.cs
+++ b/lang/csharp/src/apache/main/File/DataFileReader.cs
@@ -106,22 +106,7 @@ namespace Avro.File
         /// <returns>A new file reader</returns>
         public static IFileReader<T> OpenReader(Stream inStream, Schema readerSchema, CreateDatumReader datumReaderFactory)
         {
-            if (!inStream.CanSeek)
-                throw new AvroRuntimeException("Not a valid input stream - must be seekable!");
-
-            if (inStream.Length < DataFileConstants.Magic.Length)
-                throw new AvroRuntimeException("Not an Avro data file");
-
-            // verify magic header
-            byte[] magic = new byte[DataFileConstants.Magic.Length];
-            inStream.Seek(0, SeekOrigin.Begin);
-            for (int c = 0; c < magic.Length; c = inStream.Read(magic, c, magic.Length - c)) { }
-            inStream.Seek(0, SeekOrigin.Begin);
-
-            if (magic.SequenceEqual(DataFileConstants.Magic))   // current format
-                return new DataFileReader<T>(inStream, readerSchema, datumReaderFactory);         // (not supporting 1.2 or below, format)
-
-            throw new AvroRuntimeException("Not an Avro data file");
+            return new DataFileReader<T>(inStream, readerSchema, datumReaderFactory);         // (not supporting 1.2 or below, format)
         }
 
         DataFileReader(Stream stream, Schema readerSchema, CreateDatumReader datumReaderFactory)
@@ -191,6 +176,9 @@ namespace Avro.File
         /// <inheritdoc/>
         public void Seek(long position)
         {
+            if (!_stream.CanSeek)
+                throw new AvroRuntimeException("Not a valid input stream - must be seekable!");
+
             _stream.Position = position;
             _decoder = new BinaryDecoder(_stream);
             _datumDecoder = null;
@@ -245,6 +233,8 @@ namespace Avro.File
         /// <inheritdoc/>
         public long PreviousSync()
         {
+            if (!_stream.CanSeek)
+                throw new AvroRuntimeException("Not a valid input stream - must be seekable!");
             return _blockStart;
         }
 
@@ -412,7 +402,8 @@ namespace Avro.File
 
         private void BlockFinished()
         {
-            _blockStart = _stream.Position;
+            if (_stream.CanSeek)
+                _blockStart = _stream.Position;
         }
 
         private DataBlock NextRawBlock(DataBlock reuse)
@@ -460,10 +451,27 @@ namespace Avro.File
                     return true;
 
                 // check to ensure still data to read
-                if (!DataLeft())
-                    return false;
+                if (_stream.CanSeek)
+                {
+                    if (!DataLeft())
+                        return false;
+
+                    _blockRemaining = _decoder.ReadLong();      // read block count
+                }
+                else
+                {
+                    // when the stream is not seekable, the only way to know if there is still
+                    // some data to read is to reach the end and raise an AvroException here.
+                    try
+                    {
+                        _blockRemaining = _decoder.ReadLong();      // read block count
+                    }
+                    catch(AvroException)
+                    {
+                        return false;
+                    }
+                }
 
-                _blockRemaining = _decoder.ReadLong();      // read block count
                 _blockSize = _decoder.ReadLong();           // read block size
                 if (_blockSize > System.Int32.MaxValue || _blockSize < 0)
                 {
diff --git a/lang/csharp/src/apache/main/File/DataFileWriter.cs b/lang/csharp/src/apache/main/File/DataFileWriter.cs
index 234f205..bb936d2 100644
--- a/lang/csharp/src/apache/main/File/DataFileWriter.cs
+++ b/lang/csharp/src/apache/main/File/DataFileWriter.cs
@@ -190,15 +190,20 @@ namespace Avro.File
         public void Flush()
         {
             EnsureHeader();
-            Sync();
+            SyncInternal();
         }
 
         /// <inheritdoc/>
         public long Sync()
         {
+            SyncInternal();
+            return _stream.Position;
+        }
+
+        private void SyncInternal()
+        {
             AssertOpen();
             WriteBlock();
-            return _stream.Position;
         }
 
         /// <inheritdoc/>
diff --git a/lang/csharp/src/apache/test/File/FileTests.cs b/lang/csharp/src/apache/test/File/FileTests.cs
index 42e4d43..6b43e98 100644
--- a/lang/csharp/src/apache/test/File/FileTests.cs
+++ b/lang/csharp/src/apache/test/File/FileTests.cs
@@ -25,6 +25,7 @@ using Avro.Specific;
 using System.Reflection;
 using Avro.File;
 using System.Linq;
+using System.IO.Compression;
 
 namespace Avro.Test.File
 {
@@ -199,6 +200,53 @@ namespace Avro.Test.File
         }
 
         /// <summary>
+        /// This test is a single test case of
+        /// <see cref="TestGenericData(string, object[], Codec.Type)"/> but introduces a
+        /// DeflateStream as it is a standard non-seekable Stream that has the same behavior as the
+        /// NetworkStream, which we should handle.
+        /// </summary>
+        [TestCase("{\"type\":\"record\", \"name\":\"n\", \"fields\":" +
+            "[{\"name\":\"f1\", \"type\":[\"int\", \"long\"]}]}",
+            new object[] { "f1", 100L }, Codec.Type.Null)]
+        public void TestNonSeekableStream(string schemaStr, object[] value, Codec.Type codecType)
+        {
+            foreach (var rwFactory in GenericOptions<GenericRecord>())
+            {
+                // Create and write out
+                MemoryStream compressedStream = new MemoryStream();
+                // using here a DeflateStream as it is a standard non-seekable stream, so if it works for this one,
+                // it should also works with any standard non-seekable stream (ie: NetworkStreams)
+                DeflateStream dataFileOutputStream = new DeflateStream(compressedStream, CompressionMode.Compress);
+                using (var writer = rwFactory.CreateWriter(dataFileOutputStream, Schema.Parse(schemaStr), Codec.CreateCodec(codecType)))
+                {
+                    writer.Append(mkRecord(value, Schema.Parse(schemaStr) as RecordSchema));
+
+                    // The Sync method is not supported for non-seekable streams.
+                    Assert.Throws<NotSupportedException>(() => writer.Sync());
+                }
+
+                DeflateStream dataFileInputStream = new DeflateStream(new MemoryStream(compressedStream.ToArray()), CompressionMode.Decompress);
+
+                // Read back
+                IList<GenericRecord> readFoos = new List<GenericRecord>();
+                using (IFileReader<GenericRecord> reader = rwFactory.CreateReader(dataFileInputStream, null))
+                {
+                    foreach (GenericRecord foo in reader.NextEntries)
+                    {
+                        readFoos.Add(foo);
+                    }
+
+                    // These methods are not supported for non-seekable streams.
+                    Assert.Throws<AvroRuntimeException>(() => reader.Seek(0));
+                    Assert.Throws<AvroRuntimeException>(() => reader.PreviousSync());
+                }
+
+                Assert.IsTrue((readFoos != null && readFoos.Count > 0),
+                               string.Format(@"Generic object: {0} did not serialise/deserialise correctly", readFoos));
+            }
+        }
+
+        /// <summary>
         /// Reading & writing of primitive objects
         /// </summary>
         /// <param name="schemaStr"></param>