You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/03/08 18:39:45 UTC

[arrow] branch master updated: ARROW-4718: [C#] Add ArrowStreamReader/Writer ctor with bool leaveOpen

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d548a0  ARROW-4718: [C#] Add ArrowStreamReader/Writer ctor with bool leaveOpen
8d548a0 is described below

commit 8d548a0b482f9f86568626a964c4056315ab1b2b
Author: Stephen Toub <st...@microsoft.com>
AuthorDate: Fri Mar 8 12:39:39 2019 -0600

    ARROW-4718: [C#] Add ArrowStreamReader/Writer ctor with bool leaveOpen
    
    It's often the case that data being read/written in arrow format is part of some larger protocol.  However, ArrowStreamWriter and ArrowStreamReader close the provided stream when they're disposed.  This means that if you need to, for example, write a footer after the arrow data is written, you need to actually construct the ArrowStreamWriter around some temporary stream, write to that, then copy the data from that stream to the actual one that you then write the footer to; otherwise,  [...]
    Throughout .NET, this approach is generally codified as a ctor that takes a "leaveOpen" Boolean parameter.  This PR does the same for ArrowStreamReader/Writer, adding a constructor for each:
    ```C#
    public ArrowStreamReader(Stream stream, bool leaveOpen);
    public ArrowStreamWriter(Stream baseStream, Schema schema, bool leaveOpen);
    ```
    
    Fixes https://issues.apache.org/jira/browse/ARROW-4718
    cc: @eerhardt, @pgovind, @chutchinson
    
    Author: Stephen Toub <st...@microsoft.com>
    
    Closes #3782 from stephentoub/leaveopen and squashes the following commits:
    
    ade69082 <Stephen Toub> ARROW-4718:  Add ArrowStreamReader/Writer ctor with bool leaveOpen
---
 csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs     |  7 ++-
 .../Ipc/ArrowFileReaderImplementation.cs           |  2 +-
 csharp/src/Apache.Arrow/Ipc/ArrowFileWriter.cs     |  7 ++-
 csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs   |  7 ++-
 .../Ipc/ArrowStreamReaderImplementation.cs         |  6 ++-
 csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs   | 14 +++++-
 .../Apache.Arrow.Tests/ArrowFileReaderTests.cs     | 49 ++++++++++++++++++++
 .../Apache.Arrow.Tests/ArrowFileWriterTests.cs     | 52 ++++++++++++++++++++++
 .../Apache.Arrow.Tests/ArrowStreamReaderTests.cs   | 24 ++++++++++
 .../Apache.Arrow.Tests/ArrowStreamWriterTests.cs   | 52 ++++++++++++++++++++++
 10 files changed, 212 insertions(+), 8 deletions(-)

diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs b/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs
index 6d4c8da..da9525e 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs
@@ -30,7 +30,12 @@ namespace Apache.Arrow.Ipc
         public bool IsFileValid => Implementation.IsFileValid;
 
         public ArrowFileReader(Stream stream)
-            : base(new ArrowFileReaderImplementation(stream))
+            : this(stream, leaveOpen: false)
+        {
+        }
+
+        public ArrowFileReader(Stream stream, bool leaveOpen)
+            : base(new ArrowFileReaderImplementation(stream, leaveOpen))
         {
         }
 
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs b/csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
index 634ab53..1a99a40 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
@@ -40,7 +40,7 @@ namespace Apache.Arrow.Ipc
 
         private ArrowFooter _footer;
 
-        public ArrowFileReaderImplementation(Stream stream) : base(stream)
+        public ArrowFileReaderImplementation(Stream stream, bool leaveOpen) : base(stream, leaveOpen)
         {
         }
 
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowFileWriter.cs b/csharp/src/Apache.Arrow/Ipc/ArrowFileWriter.cs
index 74534f1..459cb51 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowFileWriter.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowFileWriter.cs
@@ -30,7 +30,12 @@ namespace Apache.Arrow.Ipc
         private List<Block> RecordBatchBlocks { get; }
 
         public ArrowFileWriter(Stream stream, Schema schema)
-            : base(stream, schema)
+            : this(stream, schema, leaveOpen: false)
+        {
+        }
+
+        public ArrowFileWriter(Stream stream, Schema schema, bool leaveOpen)
+            : base(stream, schema, leaveOpen)
         {
             if (!stream.CanWrite)
             {
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
index b297128..a399056 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
@@ -30,11 +30,16 @@ namespace Apache.Arrow.Ipc
         public Schema Schema => _implementation.Schema;
 
         public ArrowStreamReader(Stream stream)
+            : this(stream, leaveOpen: false)
+        {
+        }
+
+        public ArrowStreamReader(Stream stream, bool leaveOpen)
         {
             if (stream == null)
                 throw new ArgumentNullException(nameof(stream));
 
-            _implementation = new ArrowStreamReaderImplementation(stream);
+            _implementation = new ArrowStreamReaderImplementation(stream, leaveOpen);
         }
 
         public ArrowStreamReader(ReadOnlyMemory<byte> buffer)
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
index b07521e..36e2e57 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
@@ -25,16 +25,18 @@ namespace Apache.Arrow.Ipc
     {
         public Stream BaseStream { get; }
         protected ArrayPool<byte> Buffers { get; }
+        private readonly bool _leaveOpen;
 
-        public ArrowStreamReaderImplementation(Stream stream)
+        public ArrowStreamReaderImplementation(Stream stream, bool leaveOpen)
         {
             BaseStream = stream;
+            _leaveOpen = leaveOpen;
             Buffers = ArrayPool<byte>.Create();
         }
 
         protected override void Dispose(bool disposing)
         {
-            if (disposing)
+            if (disposing && !_leaveOpen)
             {
                 BaseStream.Dispose();
             }
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
index be768c1..b87b71b 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
@@ -149,16 +149,23 @@ namespace Apache.Arrow.Ipc
 
         protected Schema Schema { get; }
 
+        private readonly bool _leaveOpen;
+
         private protected const Flatbuf.MetadataVersion CurrentMetadataVersion = Flatbuf.MetadataVersion.V4;
 
         private static readonly byte[] Padding = new byte[64];
 
         private readonly ArrowTypeFlatbufferBuilder _fieldTypeBuilder;
 
-        public ArrowStreamWriter(Stream baseStream, Schema schema)
+        public ArrowStreamWriter(Stream baseStream, Schema schema) : this(baseStream, schema, leaveOpen: false)
+        {
+        }
+
+        public ArrowStreamWriter(Stream baseStream, Schema schema, bool leaveOpen)
         {
             BaseStream = baseStream ?? throw new ArgumentNullException(nameof(baseStream));
             Schema = schema ?? throw new ArgumentNullException(nameof(schema));
+            _leaveOpen = leaveOpen;
 
             Buffers = ArrayPool<byte>.Create();
             Builder = new FlatBufferBuilder(1024);
@@ -373,7 +380,10 @@ namespace Apache.Arrow.Ipc
 
         public virtual void Dispose()
         {
-            BaseStream.Dispose();
+            if (!_leaveOpen)
+            {
+                BaseStream.Dispose();
+            }
         }
     }
 }
diff --git a/csharp/test/Apache.Arrow.Tests/ArrowFileReaderTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowFileReaderTests.cs
new file mode 100644
index 0000000..b2b769d
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Tests/ArrowFileReaderTests.cs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Apache.Arrow.Ipc;
+using System;
+using System.IO;
+using Xunit;
+
+namespace Apache.Arrow.Tests
+{
+    public class ArrowFileReaderTests
+    {
+        [Fact]
+        public void Ctor_LeaveOpenDefault_StreamClosedOnDispose()
+        {
+            var stream = new MemoryStream();
+            new ArrowFileReader(stream).Dispose();
+            Assert.Throws<ObjectDisposedException>(() => stream.Position);
+        }
+
+        [Fact]
+        public void Ctor_LeaveOpenFalse_StreamClosedOnDispose()
+        {
+            var stream = new MemoryStream();
+            new ArrowFileReader(stream, leaveOpen: false).Dispose();
+            Assert.Throws<ObjectDisposedException>(() => stream.Position);
+        }
+
+        [Fact]
+        public void Ctor_LeaveOpenTrue_StreamValidOnDispose()
+        {
+            var stream = new MemoryStream();
+            new ArrowFileReader(stream, leaveOpen: true).Dispose();
+            Assert.Equal(0, stream.Position);
+        }
+    }
+}
diff --git a/csharp/test/Apache.Arrow.Tests/ArrowFileWriterTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowFileWriterTests.cs
new file mode 100644
index 0000000..0796df6
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Tests/ArrowFileWriterTests.cs
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Apache.Arrow.Ipc;
+using System;
+using System.IO;
+using Xunit;
+
+namespace Apache.Arrow.Tests
+{
+    public class ArrowFileWriterTests
+    {
+        [Fact]
+        public void Ctor_LeaveOpenDefault_StreamClosedOnDispose()
+        {
+            RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
+            var stream = new MemoryStream();
+            new ArrowFileWriter(stream, originalBatch.Schema).Dispose();
+            Assert.Throws<ObjectDisposedException>(() => stream.Position);
+        }
+
+        [Fact]
+        public void Ctor_LeaveOpenFalse_StreamClosedOnDispose()
+        {
+            RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
+            var stream = new MemoryStream();
+            new ArrowFileWriter(stream, originalBatch.Schema, leaveOpen: false).Dispose();
+            Assert.Throws<ObjectDisposedException>(() => stream.Position);
+        }
+
+        [Fact]
+        public void Ctor_LeaveOpenTrue_StreamValidOnDispose()
+        {
+            RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
+            var stream = new MemoryStream();
+            new ArrowFileWriter(stream, originalBatch.Schema, leaveOpen: true).Dispose();
+            Assert.Equal(0, stream.Position);
+        }
+    }
+}
diff --git a/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
index 3bd4c22..af68a85 100644
--- a/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
+++ b/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
@@ -25,6 +25,30 @@ namespace Apache.Arrow.Tests
     public class ArrowStreamReaderTests
     {
         [Fact]
+        public void Ctor_LeaveOpenDefault_StreamClosedOnDispose()
+        {
+            var stream = new MemoryStream();
+            new ArrowStreamReader(stream).Dispose();
+            Assert.Throws<ObjectDisposedException>(() => stream.Position);
+        }
+
+        [Fact]
+        public void Ctor_LeaveOpenFalse_StreamClosedOnDispose()
+        {
+            var stream = new MemoryStream();
+            new ArrowStreamReader(stream, leaveOpen: false).Dispose();
+            Assert.Throws<ObjectDisposedException>(() => stream.Position);
+        }
+
+        [Fact]
+        public void Ctor_LeaveOpenTrue_StreamValidOnDispose()
+        {
+            var stream = new MemoryStream();
+            new ArrowStreamReader(stream, leaveOpen: true).Dispose();
+            Assert.Equal(0, stream.Position);
+        }
+
+        [Fact]
         public async Task ReadRecordBatch()
         {
             RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
diff --git a/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs
new file mode 100644
index 0000000..b4cfef0
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Apache.Arrow.Ipc;
+using System;
+using System.IO;
+using Xunit;
+
+namespace Apache.Arrow.Tests
+{
+    public class ArrowStreamWriterTests
+    {
+        [Fact]
+        public void Ctor_LeaveOpenDefault_StreamClosedOnDispose()
+        {
+            RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
+            var stream = new MemoryStream();
+            new ArrowStreamWriter(stream, originalBatch.Schema).Dispose();
+            Assert.Throws<ObjectDisposedException>(() => stream.Position);
+        }
+
+        [Fact]
+        public void Ctor_LeaveOpenFalse_StreamClosedOnDispose()
+        {
+            RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
+            var stream = new MemoryStream();
+            new ArrowStreamWriter(stream, originalBatch.Schema, leaveOpen: false).Dispose();
+            Assert.Throws<ObjectDisposedException>(() => stream.Position);
+        }
+
+        [Fact]
+        public void Ctor_LeaveOpenTrue_StreamValidOnDispose()
+        {
+            RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
+            var stream = new MemoryStream();
+            new ArrowStreamWriter(stream, originalBatch.Schema, leaveOpen: true).Dispose();
+            Assert.Equal(0, stream.Position);
+        }
+    }
+}