You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by dk...@apache.org on 2020/05/22 12:34:00 UTC
[avro] branch master updated: AVRO-2784: Add the possibility to
append to existing files in C# (#853)
This is an automated email from the ASF dual-hosted git repository.
dkulp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new 2d8c857 AVRO-2784: Add the possibility to append to existing files in C# (#853)
2d8c857 is described below
commit 2d8c857a291ab61ee32e3e2b8ced02f6af6a5c76
Author: Vladimir <va...@users.noreply.github.com>
AuthorDate: Fri May 22 14:33:51 2020 +0200
AVRO-2784: Add the possibility to append to existing files in C# (#853)
*Implementation of Java AppendTo() as is
* API optimisation.
* optimisation.
* naming error fix
* Before unit tests.
* Unit tests draft
* unit tests
* Suppress CA1000 warnings for two new methods
Suppression performed to maintain compatibility
Co-authored-by: Volodymyr Aleksandrov <vo...@pl.abb.com>
---
lang/csharp/src/apache/main/File/DataFileWriter.cs | 79 +++++++-
lang/csharp/src/apache/main/GlobalSuppressions.cs | 3 +
lang/csharp/src/apache/test/File/FileTests.cs | 199 ++++++++++++++++++---
3 files changed, 255 insertions(+), 26 deletions(-)
diff --git a/lang/csharp/src/apache/main/File/DataFileWriter.cs b/lang/csharp/src/apache/main/File/DataFileWriter.cs
index bf94d7f..c044b96 100644
--- a/lang/csharp/src/apache/main/File/DataFileWriter.cs
+++ b/lang/csharp/src/apache/main/File/DataFileWriter.cs
@@ -19,8 +19,8 @@ using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
-using Avro.IO;
using Avro.Generic;
+using Avro.IO;
namespace Avro.File
{
@@ -126,7 +126,41 @@ namespace Avro.File
return new DataFileWriter<T>(writer).Create(writer.Schema, outStream, codec, leaveOpen);
}
- DataFileWriter(DatumWriter<T> writer)
+ /// <summary>
+ /// Open a new writer instance to append to a file path.
+ /// </summary>
+ /// <param name="writer">Datum writer to use.</param>
+ /// <param name="path">Path to the file.</param>
+ /// <returns>A new file writer.</returns>
+ public static IFileWriter<T> OpenAppendWriter(DatumWriter<T> writer, string path)
+ {
+ return new DataFileWriter<T>(writer).AppendTo(path);
+ }
+
+ /// <summary>
+ /// Open a new writer instance to append to an output stream.
+ /// Both in and out streams must point to the same file.
+ /// </summary>
+ /// <param name="writer">Datum writer to use.</param>
+ /// <param name="inStream">reading the existing file.</param>
+ /// <param name="outStream">stream to write to, positioned at the end of the existing file.</param>
+ /// <returns>A new file writer.</returns>
+ public static IFileWriter<T> OpenAppendWriter(DatumWriter<T> writer, Stream inStream, Stream outStream)
+ {
+ if (!inStream.CanRead)
+ {
+ throw new AvroRuntimeException($"{nameof(inStream)} must have Read access");
+ }
+
+ if (!outStream.CanWrite)
+ {
+ throw new AvroRuntimeException($"{nameof(outStream)} must have Write access");
+ }
+
+ return new DataFileWriter<T>(writer).AppendTo(inStream, outStream);
+ }
+
+ private DataFileWriter(DatumWriter<T> writer)
{
_writer = writer;
_syncInterval = DataFileConstants.DefaultSyncInterval;
@@ -205,6 +239,47 @@ namespace Avro.File
WriteIfBlockFull();
}
+ private IFileWriter<T> AppendTo(string path)
+ {
+ using (var inStream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
+ {
+ var outStream = new FileStream(path, FileMode.Append);
+ return AppendTo(inStream, outStream);
+ }
+
+ // outStream does not need to be closed here. It will be closed by invoking Close()
+ // of this writer.
+ }
+
+ private IFileWriter<T> AppendTo(Stream inStream, Stream outStream)
+ {
+ using (var dataFileReader = DataFileReader<T>.OpenReader(inStream))
+ {
+ var header = dataFileReader.GetHeader();
+ _schema = header.Schema;
+ _syncData = header.SyncData;
+ _metaData = header.MetaData;
+ }
+
+ if (_metaData.TryGetValue(DataFileConstants.MetaDataCodec, out byte[] codecBytes))
+ {
+ string codec = System.Text.Encoding.UTF8.GetString(codecBytes);
+ _codec = Codec.CreateCodecFromString(codec);
+ }
+ else
+ {
+ _codec = Codec.CreateCodec(Codec.Type.Null);
+ }
+
+ _headerWritten = true;
+ _stream = outStream;
+ _stream.Seek(0, SeekOrigin.End);
+
+ Init();
+
+ return this;
+ }
+
private void EnsureHeader()
{
if (!_headerWritten)
diff --git a/lang/csharp/src/apache/main/GlobalSuppressions.cs b/lang/csharp/src/apache/main/GlobalSuppressions.cs
index ea2dcb1..64b5fb8 100644
--- a/lang/csharp/src/apache/main/GlobalSuppressions.cs
+++ b/lang/csharp/src/apache/main/GlobalSuppressions.cs
@@ -20,6 +20,7 @@
// attributes that are applied to this project.
// Project-level suppressions either have no target or are given
// a specific target and scoped to a namespace, type, member, etc.
+
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileReader`1.OpenReader(System.IO.Stream)~Avro.File.IFileReader{`0}")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileReader`1.OpenReader(System.IO.Stream,System.Boolean)~Avro.File.IFileReader{`0}")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileReader`1.OpenReader(System.IO.Stream,Avro.Schema)~Avro.File.IFileReader{`0}")]
@@ -34,6 +35,8 @@
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileWriter`1.OpenWriter(Avro.Generic.DatumWriter{`0},System.IO.Stream,Avro.File.Codec,System.Boolean)~Avro.File.IFileWriter{`0}")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileWriter`1.OpenWriter(Avro.Generic.DatumWriter{`0},System.String)~Avro.File.IFileWriter{`0}")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileWriter`1.OpenWriter(Avro.Generic.DatumWriter{`0},System.String,Avro.File.Codec)~Avro.File.IFileWriter{`0}")]
+[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileWriter`1.OpenAppendWriter(Avro.Generic.DatumWriter{`0},System.String)~Avro.File.IFileWriter{`0}")]
+[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1000:Do not declare static members on generic types", Justification = "Maintain public API", Scope = "member", Target = "~M:Avro.File.DataFileWriter`1.OpenAppendWriter(Avro.Generic.DatumWriter{`0},System.IO.Stream,System.IO.Stream)~Avro.File.IFileWriter{`0}")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Purposely catch any exception", Scope = "member", Target = "~M:Avro.File.DataFileReader`1.Sync(System.Int64)")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Purposely catch any exception", Scope = "member", Target = "~M:Avro.Specific.ObjectCreator.FindType(System.String)~System.Type")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Purposely catch any exception", Scope = "member", Target = "~M:Avro.Specific.ObjectCreator.GetType(Avro.Schema)~System.Type")]
diff --git a/lang/csharp/src/apache/test/File/FileTests.cs b/lang/csharp/src/apache/test/File/FileTests.cs
index d748391..9229bf4 100644
--- a/lang/csharp/src/apache/test/File/FileTests.cs
+++ b/lang/csharp/src/apache/test/File/FileTests.cs
@@ -17,15 +17,14 @@
*/
using System;
using System.Collections;
-using System.IO;
using System.Collections.Generic;
+using System.IO;
+using System.IO.Compression;
+using System.Linq;
+using Avro.File;
using Avro.Generic;
-using NUnit.Framework;
using Avro.Specific;
-using System.Reflection;
-using Avro.File;
-using System.Linq;
-using System.IO.Compression;
+using NUnit.Framework;
namespace Avro.Test.File
{
@@ -97,6 +96,72 @@ namespace Avro.Test.File
}
/// <summary>
+ /// Test appending of specific (custom) record objects
+ /// </summary>
+ /// <param name="schemaStr">schema</param>
+ /// <param name="recs">initial records</param>
+ /// <param name="appendRecs">append records</param>
+ /// <param name="codecType">initial compression codec type</param>
+ [TestCase(specificSchema, new object[] { new object[] { "John", 23 } }, new object[] { new object[] { "Jane", 21 } }, Codec.Type.Deflate, TestName = "TestAppendSpecificData0")]
+ [TestCase(specificSchema, new object[] { new object[] { "John", 23 } }, new object[] { new object[] { "Jane", 21 } }, Codec.Type.Null, TestName = "TestAppendSpecificData1")]
+ [TestCase(specificSchema, new object[] { new object[] {"John", 23}, new object[] { "Jane", 99 }, new object[] { "Jeff", 88 },
+ new object[] {"James", 13}, new object[] { "June", 109 }, new object[] { "Lloyd", 18 },
+ new object[] {"Jenny", 3}, new object[] { "Bob", 9 }, new object[] { null, 48 }},
+ new object[] { new object[] { "Hillary", 79 },
+ new object[] { "Grant", 88 } }, Codec.Type.Deflate, TestName = "TestAppendSpecificData2")]
+ [TestCase(specificSchema, new object[] { new object[] {"John", 23}, new object[] { "Jane", 99 }, new object[] { "Jeff", 88 },
+ new object[] {"James", 13}, new object[] { "June", 109 }, new object[] { "Lloyd", 18 },
+ new object[] {"Jenny", 3}, new object[] { "Bob", 9 }, new object[] { null, 48 }},
+ new object[] { new object[] { "Hillary", 79 },
+ new object[] { "Grant", 88 } }, Codec.Type.Null, TestName = "TestAppendSpecificData3")]
+ public void TestAppendSpecificData(string schemaStr, object[] recs, object[] appendRecs, Codec.Type codecType)
+ {
+ IList<Foo> records = MakeRecords(recs);
+ IList<Foo> appendRecords = MakeRecords(appendRecs);
+ IList<Foo> allRecords = records.Concat(appendRecords).ToList();
+
+ foreach (var rwFactory in SpecificOptions<Foo>())
+ {
+ // create and write out
+ MemoryStream dataFileOutputStream = new MemoryStream();
+ Schema schema = Schema.Parse(schemaStr);
+ using (IFileWriter<Foo> dataFileWriter = rwFactory.CreateWriter(dataFileOutputStream, schema, Codec.CreateCodec(codecType)))
+ {
+ foreach (Foo rec in records)
+ dataFileWriter.Append(rec);
+ }
+
+ // append records
+ byte[] outputData = dataFileOutputStream.ToArray();
+ MemoryStream dataFileAppendInputStream = new MemoryStream(dataFileOutputStream.ToArray());
+ MemoryStream dataFileAppendStream = new MemoryStream(); // MemoryStream is not expandable
+ dataFileAppendStream.Write(outputData, 0, outputData.Length);
+
+ using (IFileWriter<Foo> appendFileWriter = rwFactory.CreateAppendWriter(dataFileAppendInputStream, dataFileAppendStream, schema))
+ {
+ foreach (Foo rec in appendRecords)
+ appendFileWriter.Append(rec);
+ }
+
+ MemoryStream dataFileInputStream = new MemoryStream(dataFileAppendStream.ToArray());
+
+ // read back
+ IList<Foo> readRecords = new List<Foo>();
+
+ using (IFileReader<Foo> reader = rwFactory.CreateReader(dataFileInputStream, null))
+ {
+ foreach (Foo rec in reader.NextEntries)
+ readRecords.Add(rec);
+ }
+
+ // compare objects via Json
+ Assert.AreEqual(allRecords.Count, readRecords.Count);
+ for (int i = 0; i < allRecords.Count; i++)
+ Assert.AreEqual(allRecords[i].ToString(), readRecords[i].ToString());
+ }
+ }
+
+ /// <summary>
/// Reading & writing of generic record objects
/// </summary>
/// <param name="schemaStr"></param>
@@ -200,6 +265,82 @@ namespace Avro.Test.File
}
/// <summary>
+ /// Test appending of generic record objects
+ /// </summary>
+ /// <param name="schemaStr">schema</param>
+ /// <param name="recs">initial records</param>
+ /// <param name="appendRecs">append records</param>
+ /// <param name="codecType">innitial compression codec type</param>
+ [TestCase("{\"type\":\"record\", \"name\":\"n\", \"fields\":[{\"name\":\"f1\", \"type\":\"boolean\"}]}",
+ new object[] { "f1", true }, new object[] { "f1", false }, Codec.Type.Deflate)]
+ [TestCase("{\"type\":\"record\", \"name\":\"n\", \"fields\":[{\"name\":\"f1\", \"type\":\"int\"}]}",
+ new object[] { "f1", 1 }, new object[] { "f1", 2 }, Codec.Type.Null)]
+ public void TestAppendGenericData(string schemaStr, object[] recs, object[] appendRecs, Codec.Type codecType)
+ {
+ foreach (var rwFactory in GenericOptions<GenericRecord>())
+ {
+ // Create and write out
+ MemoryStream dataFileOutputStream = new MemoryStream();
+ using (var writer = rwFactory.CreateWriter(dataFileOutputStream, Schema.Parse(schemaStr), Codec.CreateCodec(codecType)))
+ {
+ writer.Append(mkRecord(recs, Schema.Parse(schemaStr) as RecordSchema));
+ }
+
+ // append records
+ byte[] outputData = dataFileOutputStream.ToArray();
+ MemoryStream dataFileAppendInputStream = new MemoryStream(dataFileOutputStream.ToArray());
+ MemoryStream dataFileAppendStream = new MemoryStream(); // MemoryStream is not expandable
+ dataFileAppendStream.Write(outputData, 0, outputData.Length);
+
+ using (var appendFileWriter = rwFactory.CreateAppendWriter(dataFileAppendInputStream, dataFileAppendStream, Schema.Parse(schemaStr)))
+ {
+ appendFileWriter.Append(mkRecord(appendRecs, Schema.Parse(schemaStr) as RecordSchema));
+ }
+
+ MemoryStream dataFileInputStream = new MemoryStream(dataFileAppendStream.ToArray());
+
+ // Read back
+ IList<GenericRecord> readFoos = new List<GenericRecord>();
+ using (IFileReader<GenericRecord> reader = rwFactory.CreateReader(dataFileInputStream, null))
+ {
+ foreach (GenericRecord foo in reader.NextEntries)
+ {
+ readFoos.Add(foo);
+ }
+ }
+
+ Assert.NotNull(readFoos);
+ Assert.AreEqual((recs.Length + appendRecs.Length) / 2, readFoos.Count,
+ $"Generic object: {readFoos} did not serialise/deserialise correctly");
+ }
+ }
+
+ [Test]
+ public void OpenAppendWriter_IncorrectInStream_Throws()
+ {
+ MemoryStream compressedStream = new MemoryStream();
+ // using here a DeflateStream as it is a standard non-seekable stream, so if it works for this one,
+ // it should also works with any standard non-seekable stream (ie: NetworkStreams)
+ DeflateStream dataFileInputStream = new DeflateStream(compressedStream, CompressionMode.Compress);
+
+ var action = new TestDelegate(() => DataFileWriter<Foo>.OpenAppendWriter(null, dataFileInputStream, null));
+
+ var ex = Assert.Throws(typeof(AvroRuntimeException), action);
+ }
+
+ [Test]
+ public void OpenAppendWriter_IncorrectOutStream_Throws()
+ {
+ MemoryStream inStream = new MemoryStream();
+ MemoryStream outStream = new MemoryStream();
+ outStream.Close();
+
+ var action = new TestDelegate(() => DataFileWriter<Foo>.OpenAppendWriter(null, inStream, outStream));
+
+ Assert.Throws(typeof(AvroRuntimeException), action);
+ }
+
+ /// <summary>
/// This test is a single test case of
/// <see cref="TestGenericData(string, object[], Codec.Type)"/> but introduces a
/// DeflateStream as it is a standard non-seekable Stream that has the same behavior as the
@@ -678,7 +819,7 @@ namespace Avro.Test.File
}
}
- private bool CheckPrimitive<T>(Stream input, T value, ReaderWriterPair<T>.ReaderFactory createReader)
+ private bool CheckPrimitive<T>(Stream input, T value, ReaderWriterSet<T>.ReaderFactory createReader)
{
IFileReader<T> reader = createReader(input, null);
IList<T> readFoos = new List<T>();
@@ -803,49 +944,59 @@ namespace Avro.Test.File
new object[] {"Jenny", 3}, new object[] { "Bob", 9 }, new object[] { null, 48 }};
}
- private static IEnumerable<ReaderWriterPair<T>> SpecificOptions<T>()
+ private static IEnumerable<ReaderWriterSet<T>> SpecificOptions<T>()
{
- yield return new ReaderWriterPair<T>
- {
- CreateReader = (stream, schema) => DataFileReader<T>.OpenReader(stream, schema),
- CreateWriter = (stream, schema, codec) =>
- DataFileWriter<T>.OpenWriter(new SpecificWriter<T>(schema), stream, codec )
- };
-
- yield return new ReaderWriterPair<T>
+ yield return new ReaderWriterSet<T>
+ {
+ CreateReader = (stream, schema) => DataFileReader<T>.OpenReader(stream, schema),
+ CreateWriter = (stream, schema, codec) =>
+ DataFileWriter<T>.OpenWriter(new SpecificWriter<T>(schema), stream, codec),
+ CreateAppendWriter = (inStream, outStream, schema) =>
+ DataFileWriter<T>.OpenAppendWriter(new SpecificWriter<T>(schema), inStream, outStream)
+ };
+
+ yield return new ReaderWriterSet<T>
{
CreateReader = (stream, schema) => DataFileReader<T>.OpenReader(stream, schema,
(ws, rs) => new SpecificDatumReader<T>(ws, rs)),
CreateWriter = (stream, schema, codec) =>
- DataFileWriter<T>.OpenWriter(new SpecificDatumWriter<T>(schema), stream, codec )
- };
+ DataFileWriter<T>.OpenWriter(new SpecificDatumWriter<T>(schema), stream, codec ),
+ CreateAppendWriter = (inStream, outStream, schema) =>
+ DataFileWriter<T>.OpenAppendWriter(new SpecificDatumWriter<T>(schema), inStream, outStream)
+ };
}
- private static IEnumerable<ReaderWriterPair<T>> GenericOptions<T>()
+ private static IEnumerable<ReaderWriterSet<T>> GenericOptions<T>()
{
- yield return new ReaderWriterPair<T>
+ yield return new ReaderWriterSet<T>
{
CreateReader = (stream, schema) => DataFileReader<T>.OpenReader(stream, schema),
CreateWriter = (stream, schema, codec) =>
- DataFileWriter<T>.OpenWriter(new GenericWriter<T>(schema), stream, codec )
+ DataFileWriter<T>.OpenWriter(new GenericWriter<T>(schema), stream, codec ),
+ CreateAppendWriter = (inStream, outStream, schema) =>
+ DataFileWriter<T>.OpenAppendWriter(new GenericWriter<T>(schema), inStream, outStream)
};
- yield return new ReaderWriterPair<T>
+ yield return new ReaderWriterSet<T>
{
CreateReader = (stream, schema) => DataFileReader<T>.OpenReader(stream, schema,
(ws, rs) => new GenericDatumReader<T>(ws, rs)),
CreateWriter = (stream, schema, codec) =>
- DataFileWriter<T>.OpenWriter(new GenericDatumWriter<T>(schema), stream, codec )
+ DataFileWriter<T>.OpenWriter(new GenericDatumWriter<T>(schema), stream, codec ),
+ CreateAppendWriter = (inStream, outStream, schema) =>
+ DataFileWriter<T>.OpenAppendWriter(new GenericDatumWriter<T>(schema), inStream, outStream)
};
}
- class ReaderWriterPair<T>
+ class ReaderWriterSet<T>
{
public delegate IFileWriter<T> WriterFactory(Stream stream, Schema writerSchema, Codec codec);
public delegate IFileReader<T> ReaderFactory(Stream stream, Schema readerSchema);
+ public delegate IFileWriter<T> AppendFactory(Stream inStream, Stream outStream, Schema writerSchema);
public WriterFactory CreateWriter { get; set; }
public ReaderFactory CreateReader { get; set; }
+ public AppendFactory CreateAppendWriter { get; set; }
}
}