You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2023/12/08 21:50:28 UTC

(arrow) branch main updated: GH-37884: [Swift] allow reading of unaligned FlatBuffers buffers (#38635)

This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new ad63158e74 GH-37884: [Swift] allow reading of unaligned FlatBuffers buffers (#38635)
ad63158e74 is described below

commit ad63158e74d903c263c51cd0207cf77f8aa12ede
Author: abandy <ab...@live.com>
AuthorDate: Fri Dec 8 14:50:22 2023 -0700

    GH-37884: [Swift] allow reading of unaligned FlatBuffers buffers (#38635)
    
    The PR enables the swift readers to read from unaligned buffers (fix for issue: 37884)
    
    Enabling unaligned buffers incurs a performance penalty so the developer will need to consider this when enabling this feature.
    
    It is not currently possible to recover from a buffer unaligned error as this error is a fatalError so trying aligned and then falling back to unaligned is not an option.  Also, FlatBuffers has a verifier that should be able to catch this error but currently it seems to fail on both aligned and unaligned buffers (I tried verifying the example python server get return value but verification fails even though the buffers are able to be read successfully)
    * Closes: #37884
    
    Authored-by: Alva Bandy <ab...@live.com>
    Signed-off-by: Sutou Kouhei <ko...@clear-code.com>
---
 swift/Arrow/Package.swift                              |  6 +++++-
 swift/Arrow/Sources/Arrow/ArrowReader.swift            | 18 +++++++++++++-----
 .../ArrowFlight/Sources/ArrowFlight/FlightClient.swift | 11 +++++++++--
 .../ArrowFlight/Sources/ArrowFlight/FlightServer.swift |  7 +++++++
 .../Sources/ArrowFlight/RecordBatchStreamReader.swift  | 11 +++++++++--
 5 files changed, 43 insertions(+), 10 deletions(-)

diff --git a/swift/Arrow/Package.swift b/swift/Arrow/Package.swift
index 065afe6264..946eb999c7 100644
--- a/swift/Arrow/Package.swift
+++ b/swift/Arrow/Package.swift
@@ -32,7 +32,11 @@ let package = Package(
             targets: ["Arrow"]),
     ],
     dependencies: [
-        .package(url: "https://github.com/google/flatbuffers.git", from: "23.3.3")
+        // The latest version of flatbuffers v23.5.26 was built in May 26, 2023
+        // and therefore doesn't include the unaligned buffer swift changes.
+        // This can be changed back to using the tag once a new version of
+        // flatbuffers has been released.
+        .package(url: "https://github.com/google/flatbuffers.git", branch: "master")
     ],
     targets: [
         // Targets are the basic building blocks of a package. A target can define a module or a test suite.
diff --git a/swift/Arrow/Sources/Arrow/ArrowReader.swift b/swift/Arrow/Sources/Arrow/ArrowReader.swift
index ef995b1805..d9dc1bdb47 100644
--- a/swift/Arrow/Sources/Arrow/ArrowReader.swift
+++ b/swift/Arrow/Sources/Arrow/ArrowReader.swift
@@ -132,7 +132,8 @@ public class ArrowReader {
     }
 
     public func fromStream( // swiftlint:disable:this function_body_length
-        _ fileData: Data
+        _ fileData: Data,
+        useUnalignedBuffers: Bool = false
     ) -> Result<ArrowReaderResult, ArrowError> {
         let footerLength = fileData.withUnsafeBytes { rawBuffer in
             rawBuffer.loadUnaligned(fromByteOffset: fileData.count - 4, as: Int32.self)
@@ -141,7 +142,9 @@ public class ArrowReader {
         let result = ArrowReaderResult()
         let footerStartOffset = fileData.count - Int(footerLength + 4)
         let footerData = fileData[footerStartOffset...]
-        let footerBuffer = ByteBuffer(data: footerData)
+        let footerBuffer = ByteBuffer(
+            data: footerData,
+            allowReadingUnalignedBuffers: useUnalignedBuffers)
         let footer = org_apache_arrow_flatbuf_Footer.getRootAsFooter(bb: footerBuffer)
         let schemaResult = loadSchema(footer.schema!)
         switch schemaResult {
@@ -170,7 +173,9 @@ public class ArrowReader {
             let messageStartOffset = recordBatch.offset + (Int64(MemoryLayout<Int32>.size) * messageOffset)
             let messageEndOffset = messageStartOffset + Int64(messageLength)
             let recordBatchData = fileData[messageStartOffset ..< messageEndOffset]
-            let mbb = ByteBuffer(data: recordBatchData)
+            let mbb = ByteBuffer(
+                data: recordBatchData,
+                allowReadingUnalignedBuffers: useUnalignedBuffers)
             let message = org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: mbb)
             switch message.headerType {
             case .recordbatch:
@@ -219,9 +224,12 @@ public class ArrowReader {
     public func fromMessage(
         _ dataHeader: Data,
         dataBody: Data,
-        result: ArrowReaderResult
+        result: ArrowReaderResult,
+        useUnalignedBuffers: Bool = false
     ) -> Result<Void, ArrowError> {
-        let mbb = ByteBuffer(data: dataHeader)
+        let mbb = ByteBuffer(
+            data: dataHeader,
+            allowReadingUnalignedBuffers: useUnalignedBuffers)
         let message = org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: mbb)
         switch message.headerType {
         case .schema:
diff --git a/swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift b/swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift
index 7a572ceca5..ef3e4fa239 100644
--- a/swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift
+++ b/swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift
@@ -24,8 +24,11 @@ import Arrow
 
 public class FlightClient {
     let client: Arrow_Flight_Protocol_FlightServiceAsyncClient
-    public init(channel: GRPCChannel) {
+    let allowReadingUnalignedBuffers: Bool
+
+    public init(channel: GRPCChannel, allowReadingUnalignedBuffers: Bool = false ) {
         client = Arrow_Flight_Protocol_FlightServiceAsyncClient(channel: channel)
+        self.allowReadingUnalignedBuffers = allowReadingUnalignedBuffers
     }
 
     private func readMessages(
@@ -34,7 +37,11 @@ public class FlightClient {
         let reader = ArrowReader()
         let arrowResult = ArrowReader.makeArrowReaderResult()
         for try await data in responseStream {
-            switch reader.fromMessage(data.dataHeader, dataBody: data.dataBody, result: arrowResult) {
+            switch reader.fromMessage(
+                data.dataHeader,
+                dataBody: data.dataBody,
+                result: arrowResult,
+                useUnalignedBuffers: allowReadingUnalignedBuffers) {
             case .success:
                 continue
             case .failure(let error):
diff --git a/swift/ArrowFlight/Sources/ArrowFlight/FlightServer.swift b/swift/ArrowFlight/Sources/ArrowFlight/FlightServer.swift
index a34bf5c0ac..19644d632e 100644
--- a/swift/ArrowFlight/Sources/ArrowFlight/FlightServer.swift
+++ b/swift/ArrowFlight/Sources/ArrowFlight/FlightServer.swift
@@ -63,6 +63,7 @@ public func schemaFromMessage(_ schemaData: Data) -> ArrowSchema? {
 }
 
 public protocol ArrowFlightServer: Sendable {
+    var allowReadingUnalignedBuffers: Bool { get }
     func listFlights(_ criteria: FlightCriteria, writer: FlightInfoStreamWriter) async throws
     func getFlightInfo(_ request: FlightDescriptor) async throws -> FlightInfo
     func getSchema(_ request: FlightDescriptor) async throws -> ArrowFlight.FlightSchemaResult
@@ -73,6 +74,12 @@ public protocol ArrowFlightServer: Sendable {
     func doExchange(_ reader: RecordBatchStreamReader, writer: RecordBatchStreamWriter) async throws
 }
 
+extension ArrowFlightServer {
+    var allowReadingUnalignedBuffers: Bool {
+        return false
+    }
+}
+
 public func makeFlightServer(_ handler: ArrowFlightServer) -> CallHandlerProvider {
   return InternalFlightServer(handler)
 }
diff --git a/swift/ArrowFlight/Sources/ArrowFlight/RecordBatchStreamReader.swift b/swift/ArrowFlight/Sources/ArrowFlight/RecordBatchStreamReader.swift
index 972d19435d..464752dbcb 100644
--- a/swift/ArrowFlight/Sources/ArrowFlight/RecordBatchStreamReader.swift
+++ b/swift/ArrowFlight/Sources/ArrowFlight/RecordBatchStreamReader.swift
@@ -27,10 +27,13 @@ public class RecordBatchStreamReader: AsyncSequence, AsyncIteratorProtocol {
     var descriptor: FlightDescriptor?
     var batchIndex = 0
     var streamIterator: any AsyncIteratorProtocol
+    var useUnalignedBuffers: Bool
     let stream: GRPC.GRPCAsyncRequestStream<Arrow_Flight_Protocol_FlightData>
-    init(_ stream: GRPC.GRPCAsyncRequestStream<Arrow_Flight_Protocol_FlightData>) {
+    init(_ stream: GRPC.GRPCAsyncRequestStream<Arrow_Flight_Protocol_FlightData>,
+         useUnalignedBuffers: Bool = false) {
         self.stream = stream
         self.streamIterator = self.stream.makeAsyncIterator()
+        self.useUnalignedBuffers = useUnalignedBuffers
     }
 
     public func next() async throws -> (Arrow.RecordBatch?, FlightDescriptor?)? {
@@ -55,7 +58,11 @@ public class RecordBatchStreamReader: AsyncSequence, AsyncIteratorProtocol {
             let dataBody = flightData.dataBody
             let dataHeader = flightData.dataHeader
             descriptor = FlightDescriptor(flightData.flightDescriptor)
-            switch reader.fromMessage(dataHeader, dataBody: dataBody, result: result) {
+            switch reader.fromMessage(
+                dataHeader,
+                dataBody: dataBody,
+                result: result,
+                useUnalignedBuffers: useUnalignedBuffers) {
             case .success(()):
                 if result.batches.count > 0 {
                     batches = result.batches