You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by dk...@apache.org on 2020/05/22 12:34:00 UTC

[avro] branch master updated: AVRO-2784: Add the possibility to append to existing files in C# (#853)

This is an automated email from the ASF dual-hosted git repository.

dkulp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d8c857  AVRO-2784: Add the possibility to append to existing files in C# (#853)
2d8c857 is described below

commit 2d8c857a291ab61ee32e3e2b8ced02f6af6a5c76
Author: Vladimir <va...@users.noreply.github.com>
AuthorDate: Fri May 22 14:33:51 2020 +0200

    AVRO-2784: Add the possibility to append to existing files in C# (#853)
    
    *Implementation of Java AppendTo() as is
    
    * API optimisation.
    
    * optimisation.
    
    * naming error fix
    
    * Before unit tests.
    
    * Unit tests draft
    
    * unit tests
    
    * Suppress CA1000 warnings for two new methods
    
    Suppression performed to maintain compatibility
    
    Co-authored-by: Volodymyr Aleksandrov <vo...@pl.abb.com>
---
 lang/csharp/src/apache/main/File/DataFileWriter.cs |  79 +++++++-
 lang/csharp/src/apache/main/GlobalSuppressions.cs  |   3 +
 lang/csharp/src/apache/test/File/FileTests.cs      | 199 ++++++++++++++++++---
 3 files changed, 255 insertions(+), 26 deletions(-)

diff --git a/lang/csharp/src/apache/main/File/DataFileWriter.cs b/lang/csharp/src/apache/main/File/DataFileWriter.cs
index bf94d7f..c044b96 100644
--- a/lang/csharp/src/apache/main/File/DataFileWriter.cs
+++ b/lang/csharp/src/apache/main/File/DataFileWriter.cs
@@ -19,8 +19,8 @@ using System;
 using System.Collections.Generic;
 using System.Globalization;
 using System.IO;
-using Avro.IO;
 using Avro.Generic;
+using Avro.IO;
 
 namespace Avro.File
 {
@@ -126,7 +126,41 @@ namespace Avro.File
             return new DataFileWriter<T>(writer).Create(writer.Schema, outStream, codec, leaveOpen);
         }
 
-        DataFileWriter(DatumWriter<T> writer)
+        /// <summary>
+        /// Open a new writer instance to append to a file path.
+        /// </summary>
+        /// <param name="writer">Datum writer to use.</param>
+        /// <param name="path">Path to the file.</param>
+        /// <returns>A new file writer.</returns>
+        public static IFileWriter<T> OpenAppendWriter(DatumWriter<T> writer, string path)
+        {
+            return new DataFileWriter<T>(writer).AppendTo(path);
+        }
+
+        /// <summary>
+        /// Open a new writer instance to append to an output stream.
+        /// Both in and out streams must point to the same file.
+        /// </summary>
+        /// <param name="writer">Datum writer to use.</param>
+        /// <param name="inStream">reading the existing file.</param>
+        /// <param name="outStream">stream to write to, positioned at the end of the existing file.</param>
+        /// <returns>A new file writer.</returns>
+        public static IFileWriter<T> OpenAppendWriter(DatumWriter<T> writer, Stream inStream, Stream outStream)
+        {
+            if (!inStream.CanRead)
+            {
+                throw new AvroRuntimeException($"{nameof(inStream)} must have Read access");
+            }
+
+            if (!outStream.CanWrite)
+            {
+                throw new AvroRuntimeException($"{nameof(outStream)} must have Write access");
+            }
+
+            return new DataFileWriter<T>(writer).AppendTo(inStream, outStream);
+        }
+
+        private DataFileWriter(DatumWriter<T> writer)
         {
             _writer = writer;
             _syncInterval = DataFileConstants.DefaultSyncInterval;
@@ -205,6 +239,47 @@ namespace Avro.File
             WriteIfBlockFull();
         }
 
+        private IFileWriter<T> AppendTo(string path)
+        {
+            using (var inStream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
+            {
+                var outStream = new FileStream(path, FileMode.Append);
+                return AppendTo(inStream, outStream);
+            }
+
+            // outStream does not need to be closed here. It will be closed by invoking Close()
+            // of this writer.
+        }
+
+        private IFileWriter<T> AppendTo(Stream inStream, Stream outStream)
+        {
+            using (var dataFileReader = DataFileReader<T>.OpenReader(inStream))
+            {
+                var header = dataFileReader.GetHeader();
+                _schema = header.Schema;
+                _syncData = header.SyncData;
+                _metaData = header.MetaData;
+            }
+
+            if (_metaData.TryGetValue(DataFileConstants.MetaDataCodec, out byte[] codecBytes))
+            {
+                string codec = System.Text.Encoding.UTF8.GetString(codecBytes);
+                _codec = Codec.CreateCodecFromString(codec);
+            }
+            else
+            {
+                _codec = Codec.CreateCodec(Codec.Type.Null);
+            }
+
+            _headerWritten = true;
+            _stream = outStream;
+            _stream.Seek(0, SeekOrigin.End);
+
+            Init();
+
+            return this;
+        }
+
         private void EnsureHeader()
         {
             if (!_headerWritten)
diff --git a/lang/csharp/src/apache/main/GlobalSuppressions.cs b/lang/csharp/src/apache/main/GlobalSuppressions.cs
index ea2dcb1..64b5fb8 100644
--- a/lang/csharp/src/apache/main/GlobalSuppressions.cs
+++ b/lang/csharp/src/apache/main/GlobalSuppressions.cs
@@ -20,6 +20,7 @@
 // attributes that are applied to this project.
 // Project-level suppressions either have no target or are given
 // a specific target and scoped to a namespace, type, member, etc.
+
 [assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileReader`1.OpenReader(System.IO.Stream)~Avro.File.IFileReader{`0}")]
 [assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileReader`1.OpenReader(System.IO.Stream,System.Boolean)~Avro.File.IFileReader{`0}")]
 [assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileReader`1.OpenReader(System.IO.Stream,Avro.Schema)~Avro.File.IFileReader{`0}")]
@@ -34,6 +35,8 @@
 [assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileWriter`1.OpenWriter(Avro.Generic.DatumWriter{`0},System.IO.Stream,Avro.File.Codec,System.Boolean)~Avro.File.IFileWriter{`0}")]
 [assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileWriter`1.OpenWriter(Avro.Generic.DatumWriter{`0},System.String)~Avro.File.IFileWriter{`0}")]
 [assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileWriter`1.OpenWriter(Avro.Generic.DatumWriter{`0},System.String,Avro.File.Codec)~Avro.File.IFileWriter{`0}")]
+[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileWriter`1.OpenAppendWriter(Avro.Generic.DatumWriter{`0},System.String)~Avro.File.IFileWriter{`0}")]
+[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileWriter`1.OpenAppendWriter(Avro.Generic.DatumWriter{`0},System.IO.Stream,System.IO.Stream)~Avro.File.IFileWriter{`0}")]
 [assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Purposely catch any exception", Scope = "member", Target = "~M:Avro.File.DataFileReader`1.Sync(System.Int64)")]
 [assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Purposely catch any exception", Scope = "member", Target = "~M:Avro.Specific.ObjectCreator.FindType(System.String)~System.Type")]
 [assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Purposely catch any exception", Scope = "member", Target = "~M:Avro.Specific.ObjectCreator.GetType(Avro.Schema)~System.Type")]
diff --git a/lang/csharp/src/apache/test/File/FileTests.cs b/lang/csharp/src/apache/test/File/FileTests.cs
index d748391..9229bf4 100644
--- a/lang/csharp/src/apache/test/File/FileTests.cs
+++ b/lang/csharp/src/apache/test/File/FileTests.cs
@@ -17,15 +17,14 @@
  */
 using System;
 using System.Collections;
-using System.IO;
 using System.Collections.Generic;
+using System.IO;
+using System.IO.Compression;
+using System.Linq;
+using Avro.File;
 using Avro.Generic;
-using NUnit.Framework;
 using Avro.Specific;
-using System.Reflection;
-using Avro.File;
-using System.Linq;
-using System.IO.Compression;
+using NUnit.Framework;
 
 namespace Avro.Test.File
 {
@@ -97,6 +96,72 @@ namespace Avro.Test.File
         }
 
         /// <summary>
+        /// Test appending of specific (custom) record objects
+        /// </summary>
+        /// <param name="schemaStr">schema</param>
+        /// <param name="recs">initial records</param>
+        /// <param name="appendRecs">append records</param>
+        /// <param name="codecType">initial compression codec type</param>
+        [TestCase(specificSchema, new object[] { new object[] { "John", 23 } }, new object[] { new object[] { "Jane", 21 } }, Codec.Type.Deflate, TestName = "TestAppendSpecificData0")]
+        [TestCase(specificSchema, new object[] { new object[] { "John", 23 } }, new object[] { new object[] { "Jane", 21 } }, Codec.Type.Null, TestName = "TestAppendSpecificData1")]
+        [TestCase(specificSchema, new object[] { new object[] {"John", 23}, new object[] { "Jane", 99 }, new object[] { "Jeff", 88 },
+                                                 new object[] {"James", 13}, new object[] { "June", 109 }, new object[] { "Lloyd", 18 },
+                                                 new object[] {"Jenny", 3}, new object[] { "Bob", 9 }, new object[] { null, 48 }},
+                                  new object[] { new object[] { "Hillary", 79 },
+                                                 new object[] { "Grant", 88 } }, Codec.Type.Deflate, TestName = "TestAppendSpecificData2")]
+        [TestCase(specificSchema, new object[] { new object[] {"John", 23}, new object[] { "Jane", 99 }, new object[] { "Jeff", 88 },
+                                                 new object[] {"James", 13}, new object[] { "June", 109 }, new object[] { "Lloyd", 18 },
+                                                 new object[] {"Jenny", 3}, new object[] { "Bob", 9 }, new object[] { null, 48 }},
+                                  new object[] { new object[] { "Hillary", 79 },
+                                                 new object[] { "Grant", 88 } }, Codec.Type.Null, TestName = "TestAppendSpecificData3")]
+        public void TestAppendSpecificData(string schemaStr, object[] recs, object[] appendRecs, Codec.Type codecType)
+        {
+            IList<Foo> records = MakeRecords(recs);
+            IList<Foo> appendRecords = MakeRecords(appendRecs);
+            IList<Foo> allRecords = records.Concat(appendRecords).ToList();
+
+            foreach (var rwFactory in SpecificOptions<Foo>())
+            {
+                // create and write out
+                MemoryStream dataFileOutputStream = new MemoryStream();
+                Schema schema = Schema.Parse(schemaStr);
+                using (IFileWriter<Foo> dataFileWriter = rwFactory.CreateWriter(dataFileOutputStream, schema, Codec.CreateCodec(codecType)))
+                {
+                    foreach (Foo rec in records)
+                        dataFileWriter.Append(rec);
+                }
+
+                // append records
+                byte[] outputData = dataFileOutputStream.ToArray();
+                MemoryStream dataFileAppendInputStream = new MemoryStream(dataFileOutputStream.ToArray());
+                MemoryStream dataFileAppendStream = new MemoryStream(); // MemoryStream is not expandable
+                dataFileAppendStream.Write(outputData, 0, outputData.Length);
+
+                using (IFileWriter<Foo> appendFileWriter = rwFactory.CreateAppendWriter(dataFileAppendInputStream, dataFileAppendStream, schema))
+                {
+                    foreach (Foo rec in appendRecords)
+                        appendFileWriter.Append(rec);
+                }
+
+                MemoryStream dataFileInputStream = new MemoryStream(dataFileAppendStream.ToArray());
+
+                // read back
+                IList<Foo> readRecords = new List<Foo>();
+
+                using (IFileReader<Foo> reader = rwFactory.CreateReader(dataFileInputStream, null))
+                {
+                    foreach (Foo rec in reader.NextEntries)
+                        readRecords.Add(rec);
+                }
+
+                // compare objects via Json
+                Assert.AreEqual(allRecords.Count, readRecords.Count);
+                for (int i = 0; i < allRecords.Count; i++)
+                    Assert.AreEqual(allRecords[i].ToString(), readRecords[i].ToString());
+            }
+        }
+
+        /// <summary>
         /// Reading & writing of generic record objects
         /// </summary>
         /// <param name="schemaStr"></param>
@@ -200,6 +265,82 @@ namespace Avro.Test.File
         }
 
         /// <summary>
+        /// Test appending of generic record objects
+        /// </summary>
+        /// <param name="schemaStr">schema</param>
+        /// <param name="recs">initial records</param>
+        /// <param name="appendRecs">append records</param>
+        /// <param name="codecType">innitial compression codec type</param>
+        [TestCase("{\"type\":\"record\", \"name\":\"n\", \"fields\":[{\"name\":\"f1\", \"type\":\"boolean\"}]}",
+            new object[] { "f1", true }, new object[] { "f1", false }, Codec.Type.Deflate)]
+        [TestCase("{\"type\":\"record\", \"name\":\"n\", \"fields\":[{\"name\":\"f1\", \"type\":\"int\"}]}",
+            new object[] { "f1", 1 }, new object[] { "f1", 2 }, Codec.Type.Null)]
+        public void TestAppendGenericData(string schemaStr, object[] recs, object[] appendRecs, Codec.Type codecType)
+        {
+            foreach (var rwFactory in GenericOptions<GenericRecord>())
+            {
+                // Create and write out
+                MemoryStream dataFileOutputStream = new MemoryStream();
+                using (var writer = rwFactory.CreateWriter(dataFileOutputStream, Schema.Parse(schemaStr), Codec.CreateCodec(codecType)))
+                {
+                    writer.Append(mkRecord(recs, Schema.Parse(schemaStr) as RecordSchema));
+                }
+
+                // append records
+                byte[] outputData = dataFileOutputStream.ToArray();
+                MemoryStream dataFileAppendInputStream = new MemoryStream(dataFileOutputStream.ToArray());
+                MemoryStream dataFileAppendStream = new MemoryStream(); // MemoryStream is not expandable
+                dataFileAppendStream.Write(outputData, 0, outputData.Length);
+
+                using (var appendFileWriter = rwFactory.CreateAppendWriter(dataFileAppendInputStream, dataFileAppendStream, Schema.Parse(schemaStr)))
+                {
+                    appendFileWriter.Append(mkRecord(appendRecs, Schema.Parse(schemaStr) as RecordSchema));
+                }
+
+                MemoryStream dataFileInputStream = new MemoryStream(dataFileAppendStream.ToArray());
+
+                // Read back
+                IList<GenericRecord> readFoos = new List<GenericRecord>();
+                using (IFileReader<GenericRecord> reader = rwFactory.CreateReader(dataFileInputStream, null))
+                {
+                    foreach (GenericRecord foo in reader.NextEntries)
+                    {
+                        readFoos.Add(foo);
+                    }
+                }
+
+                Assert.NotNull(readFoos);
+                Assert.AreEqual((recs.Length + appendRecs.Length) / 2, readFoos.Count,
+                    $"Generic object: {readFoos} did not serialise/deserialise correctly");
+            }
+        }
+
+        [Test]
+        public void OpenAppendWriter_IncorrectInStream_Throws()
+        {
+            MemoryStream compressedStream = new MemoryStream();
+            // using here a DeflateStream as it is a standard non-seekable stream, so if it works for this one,
+            // it should also works with any standard non-seekable stream (ie: NetworkStreams)
+            DeflateStream dataFileInputStream = new DeflateStream(compressedStream, CompressionMode.Compress);
+
+            var action =  new TestDelegate(() => DataFileWriter<Foo>.OpenAppendWriter(null, dataFileInputStream, null));
+
+            var ex = Assert.Throws(typeof(AvroRuntimeException), action);
+        }
+
+        [Test]
+        public void OpenAppendWriter_IncorrectOutStream_Throws()
+        {
+            MemoryStream inStream = new MemoryStream();
+            MemoryStream outStream = new MemoryStream();
+            outStream.Close();
+
+            var action = new TestDelegate(() => DataFileWriter<Foo>.OpenAppendWriter(null, inStream, outStream));
+
+            Assert.Throws(typeof(AvroRuntimeException), action);
+        }
+
+        /// <summary>
         /// This test is a single test case of
         /// <see cref="TestGenericData(string, object[], Codec.Type)"/> but introduces a
         /// DeflateStream as it is a standard non-seekable Stream that has the same behavior as the
@@ -678,7 +819,7 @@ namespace Avro.Test.File
             }
         }
 
-        private bool CheckPrimitive<T>(Stream input, T value, ReaderWriterPair<T>.ReaderFactory createReader)
+        private bool CheckPrimitive<T>(Stream input, T value, ReaderWriterSet<T>.ReaderFactory createReader)
         {
             IFileReader<T> reader = createReader(input, null);
             IList<T> readFoos = new List<T>();
@@ -803,49 +944,59 @@ namespace Avro.Test.File
                                   new object[] {"Jenny", 3}, new object[] { "Bob", 9 }, new object[] { null, 48 }};
         }
 
-        private static IEnumerable<ReaderWriterPair<T>> SpecificOptions<T>()
+        private static IEnumerable<ReaderWriterSet<T>> SpecificOptions<T>()
         {
-            yield return new ReaderWriterPair<T>
-                             {
-                                 CreateReader = (stream, schema) => DataFileReader<T>.OpenReader(stream, schema),
-                                 CreateWriter = (stream, schema, codec) =>
-                                     DataFileWriter<T>.OpenWriter(new SpecificWriter<T>(schema), stream, codec )
-                             };
-
-            yield return new ReaderWriterPair<T>
+            yield return new ReaderWriterSet<T>
+            {
+                CreateReader = (stream, schema) => DataFileReader<T>.OpenReader(stream, schema),
+                CreateWriter = (stream, schema, codec) =>
+                    DataFileWriter<T>.OpenWriter(new SpecificWriter<T>(schema), stream, codec),
+                CreateAppendWriter = (inStream, outStream, schema) =>
+                    DataFileWriter<T>.OpenAppendWriter(new SpecificWriter<T>(schema), inStream, outStream)
+            };
+
+            yield return new ReaderWriterSet<T>
                              {
                                  CreateReader = (stream, schema) => DataFileReader<T>.OpenReader(stream, schema,
                                      (ws, rs) => new SpecificDatumReader<T>(ws, rs)),
                                  CreateWriter = (stream, schema, codec) =>
-                                     DataFileWriter<T>.OpenWriter(new SpecificDatumWriter<T>(schema), stream, codec )
-                             };
+                                     DataFileWriter<T>.OpenWriter(new SpecificDatumWriter<T>(schema), stream, codec ),
+                                 CreateAppendWriter = (inStream, outStream, schema) =>
+                                     DataFileWriter<T>.OpenAppendWriter(new SpecificDatumWriter<T>(schema), inStream, outStream)
+            };
         }
 
-        private static IEnumerable<ReaderWriterPair<T>> GenericOptions<T>()
+        private static IEnumerable<ReaderWriterSet<T>> GenericOptions<T>()
         {
-            yield return new ReaderWriterPair<T>
+            yield return new ReaderWriterSet<T>
                              {
                                  CreateReader = (stream, schema) => DataFileReader<T>.OpenReader(stream, schema),
                                  CreateWriter = (stream, schema, codec) =>
-                                     DataFileWriter<T>.OpenWriter(new GenericWriter<T>(schema), stream, codec )
+                                     DataFileWriter<T>.OpenWriter(new GenericWriter<T>(schema), stream, codec ),
+                                 CreateAppendWriter = (inStream, outStream, schema) =>
+                                     DataFileWriter<T>.OpenAppendWriter(new GenericWriter<T>(schema), inStream, outStream)
                              };
 
-            yield return new ReaderWriterPair<T>
+            yield return new ReaderWriterSet<T>
                              {
                                  CreateReader = (stream, schema) => DataFileReader<T>.OpenReader(stream, schema,
                                      (ws, rs) => new GenericDatumReader<T>(ws, rs)),
                                  CreateWriter = (stream, schema, codec) =>
-                                     DataFileWriter<T>.OpenWriter(new GenericDatumWriter<T>(schema), stream, codec )
+                                     DataFileWriter<T>.OpenWriter(new GenericDatumWriter<T>(schema), stream, codec ),
+                                 CreateAppendWriter = (inStream, outStream, schema) =>
+                                    DataFileWriter<T>.OpenAppendWriter(new GenericDatumWriter<T>(schema), inStream, outStream)
                              };
         }
 
-        class ReaderWriterPair<T>
+        class ReaderWriterSet<T>
         {
             public delegate IFileWriter<T> WriterFactory(Stream stream, Schema writerSchema, Codec codec);
             public delegate IFileReader<T> ReaderFactory(Stream stream, Schema readerSchema);
+            public delegate IFileWriter<T> AppendFactory(Stream inStream, Stream outStream, Schema writerSchema);
 
             public WriterFactory CreateWriter { get; set; }
             public ReaderFactory CreateReader { get; set; }
+            public AppendFactory CreateAppendWriter { get; set; }
         }
     }