You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/08/28 17:48:04 UTC

[beam] 21/22: Changes to allow Swift SDK to operate successfully with the Flink Portable Runner as well as the Python Portable Runner. Modified the PInput/POutput functionality to just be structs, this allows us to use them for both closures as well as the eventual DoFn interface and cleans up the function signatures.

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch swift-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c13e652d2a4051e9d846d9dd31cbd0fc3447bdd8
Author: Byron Ellis <by...@google.com>
AuthorDate: Fri Aug 25 13:41:43 2023 -0700

    Changes to allow Swift SDK to operate successfully with the Flink Portable Runner as well as the Python Portable Runner. Modified the PInput/POutput functionality to just be structs, this allows us to use them for both closures as well as the eventual DoFn interface and cleans up the function signatures.
---
 .../Sources/ApacheBeam/Coders/Coder+Decoding.swift |  8 ++-
 .../Sources/ApacheBeam/Coders/Coder+Encoding.swift |  6 +-
 sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift   | 29 ++++++++-
 .../ApacheBeam/Core/Pipeline/Pipeline.swift        |  2 -
 .../ApacheBeam/Internal/Data+Decoding.swift        |  7 ++-
 .../ApacheBeam/Internal/Data+Encoding.swift        |  6 ++
 .../ApacheBeam/Internal/Date+Timestamp.swift       |  2 +-
 .../Runtime/Bundle/BundleProcessor.swift           | 19 ++++--
 .../Sources/ApacheBeam/Runtime/Bundle/Sink.swift   |  1 +
 .../Sources/ApacheBeam/Runtime/Bundle/Source.swift | 14 ++++-
 .../ApacheBeam/Runtime/DataplaneClient.swift       | 41 +++++++++++--
 .../Sources/ApacheBeam/Runtime/Worker/Worker.swift | 11 +++-
 .../ApacheBeam/Transforms/BuiltIn+Elements.swift   | 71 +++-------------------
 .../Sources/ApacheBeam/Transforms/Combining.swift  |  3 +-
 .../ApacheBeamTests/Pipeline/FileIOTests.swift     |  9 +--
 .../Pipeline/IntegrationTests.swift                |  2 +-
 16 files changed, 137 insertions(+), 94 deletions(-)

diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
index 9eef4113576..043b6690981 100644
--- a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
+++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
@@ -52,8 +52,10 @@ public extension Coder {
             let length = try data.next(Int32.self).byteSwapped
             return .array(try (0..<length).map({ _ in try coder.decode(&data) }))
         case let .windowedvalue(valueCoder, windowCoder):
-            let timestamp = try data.next(Int64.self).byteSwapped &+ Int64(-9223372036854775808)
-            let windowCount = try data.next(Int32.self).byteSwapped
+            // This will be big endian to match java
+            let timestamp = try data.instant()
+
+            let windowCount = Int32(bigEndian: try data.next(Int32.self))
             if windowCount > 1 {
                 throw ApacheBeamError.runtimeError("Windowed values with > 1 window not yet supported")
             }
@@ -72,7 +74,7 @@ public extension Coder {
             default:
                 throw ApacheBeamError.runtimeError("Invalid pane encoding \(String(pane,radix:2))")
             }
-            return .windowed(try valueCoder.decode(&data), Date(millisecondsSince1970: timestamp), pane, window)
+            return .windowed(try valueCoder.decode(&data), timestamp, pane, window)
         default:
             throw ApacheBeamError.runtimeError("Decoding of \(self.urn) coders not supported.")
         }
diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift
index fffeb351a53..5076275dffc 100644
--- a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift
+++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift
@@ -90,12 +90,12 @@ public extension Coder {
         case let .windowedvalue(valueCoder, windowCoder):
             if let (v,ts,w) = value as? (Any,Date,Window) {
                 //Timestamp
-                data.next( (ts.millisecondsSince1970 &- Int64(-9223372036854775808)).bigEndian )
+                data.instant(ts)
                 switch w {
                 case .global:
-                    data.next(Int32(0))
+                    data.next(Int32(1).bigEndian)
                 default:
-                    data.next(Int32(1))
+                    data.next(Int32(1).bigEndian)
                     try windowCoder.encode(w,data:&data)
                 }
                 // TODO: Real Panes
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift
index 713b1de6796..28b809e5c53 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-import Logging
+import Foundation
 
 /// A higher level interface to SerializableFn using dependency injected dynamic properties in the same
 /// way as we define Composite PTransforms
@@ -29,4 +29,31 @@ public extension DoFn {
     func finishBundle() async throws { }
 }
 
+public struct PInput<Of> {
+    public let value: Of
+    public let timestamp: Date
+    public let window: Window
+    
+    public init(_ value: Of,_ timestamp: Date,_ window: Window) {
+        self.value = value
+        self.timestamp = timestamp
+        self.window = window
+    }
+    
+    public init(_ element: (Of,Date,Window)) {
+        self.value = element.0
+        self.timestamp = element.1
+        self.window = element.2
+    }
+    
+}
 
+public struct POutput<Of> {
+    let stream: PCollectionStream<Of>
+    let timestamp: Date
+    let window: Window
+    
+    func emit(_ value: Of,timestamp: Date? = nil,window: Window? = nil) {
+        stream.emit(value,timestamp: timestamp ?? self.timestamp,window: window ?? self.window)
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
index 825680cfa4c..34e404eca83 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
@@ -211,7 +211,6 @@ public final class Pipeline {
                                         $0.spec = .with {
                                             $0.urn = .transformUrn("impulse")
                                         }
-                                        $0.environmentID = defaultEnvironment.name
                                     }
                             }
                             rootIds.append(p.name)
@@ -232,7 +231,6 @@ public final class Pipeline {
                                         $0.spec = .with {
                                             $0.urn = .transformUrn("group_by_key")
                                         }
-                                        $0.environmentID = defaultEnvironment.name
                                     }
                             }
                             rootIds.append(p.name)
diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift b/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift
index 978898fb51f..6487063e0e3 100644
--- a/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift
+++ b/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift
@@ -68,6 +68,11 @@ extension Data {
         return result
     }
     
+    mutating func instant() throws -> Date {
+        let millis = Int64(bigEndian:try next(Int64.self)) &+ Int64(-9223372036854775808)
+        return Date(millisecondsSince1970: millis)
+    }
+    
     /// Read a length prefixed chunk of data
     mutating func subdata() throws -> Data {
         let length = try self.varint()
@@ -80,7 +85,7 @@ extension Data {
     mutating func next<T>(_ type: T.Type) throws -> T {
         let size = MemoryLayout<T>.size
         let result = self.withUnsafeBytes {
-            $0.baseAddress!.withMemoryRebound(to: T.self, capacity: 1) { $0.pointee }
+            $0.load(as: T.self)
         }
         self = self.safeAdvance(by: size)
         return result
diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift b/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift
index 86007b21104..1a947162251 100644
--- a/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift
+++ b/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift
@@ -27,6 +27,12 @@ extension Data {
         self.append(UInt8(current))
     }
     
+    mutating func instant(_ value: Date) {
+        Swift.withUnsafeBytes(of:(Int64(value.millisecondsSince1970) &- Int64(-9223372036854775808)).bigEndian) {
+            self.append(contentsOf: $0)
+        }
+    }
+    
     mutating func next<T>(_ value: T) {
         Swift.withUnsafeBytes(of:value) {
             self.append(contentsOf: $0)
diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift b/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift
index 16eb658d050..9c3424584ef 100644
--- a/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift
+++ b/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift
@@ -22,6 +22,6 @@ extension Date {
         Int64((self.timeIntervalSince1970 * 1000.0).rounded())
     }
     init(millisecondsSince1970: Int64) {
-        self = Date(timeIntervalSince1970: TimeInterval(millisecondsSince1970) / 1000)
+        self = Date(timeIntervalSince1970: Double(millisecondsSince1970) / 1000.0)
     }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
index 4e977d6f112..1e46b3e723d 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
@@ -37,7 +37,7 @@ struct BundleProcessor {
          descriptor:Org_Apache_Beam_Model_FnExecution_V1_ProcessBundleDescriptor,
          collections: [String:AnyPCollection],
          fns: [String:SerializableFn]) throws {
-        self.log = Logging.Logger(label: "BundleProcessor(\(descriptor.id))")
+        self.log = Logging.Logger(label: "BundleProcessor(\(id) \(descriptor.id))")
         
         var temp: [Step] = []
         var coders =  BundleCoderContainer(bundle:descriptor)
@@ -60,7 +60,7 @@ struct BundleProcessor {
         
         
         
-        for (_,transform) in descriptor.transforms {
+        for (transformId,transform) in descriptor.transforms {
             let urn = transform.spec.urn
             //Map the input and output streams in the correct order
             let inputs = transform.inputs.sorted().map { streams[$0.1]! }
@@ -69,8 +69,9 @@ struct BundleProcessor {
             if urn == "beam:runner:source:v1" {
                 let remotePort = try RemoteGrpcPort(serializedData: transform.spec.payload)
                 let coder = try Coder.of(name: remotePort.coderID, in: coders)
+                log.info("Source '\(transformId)','\(transform.uniqueName)' \(remotePort) \(coder)")
                 temp.append(Step(
-                    transformId: transform.uniqueName,
+                    transformId: transform.uniqueName == "" ? transformId : transform.uniqueName,
                     fn:Source(client: try .client(for: ApiServiceDescriptor(proto:remotePort.apiServiceDescriptor), worker: id), coder: coder),
                     inputs:inputs,
                     outputs:outputs,
@@ -79,8 +80,9 @@ struct BundleProcessor {
             } else if urn == "beam:runner:sink:v1" {
                 let remotePort = try RemoteGrpcPort(serializedData: transform.spec.payload)
                 let coder = try Coder.of(name: remotePort.coderID, in: coders)
+                log.info("Sink '\(transformId)','\(transform.uniqueName)' \(remotePort) \(coder)")
                 temp.append(Step(
-                    transformId: transform.uniqueName,
+                    transformId: transform.uniqueName == "" ? transformId : transform.uniqueName,
                     fn:Sink(client: try .client(for: ApiServiceDescriptor(proto:remotePort.apiServiceDescriptor), worker: id), coder: coder),
                     inputs:inputs,
                     outputs:outputs,
@@ -108,16 +110,23 @@ struct BundleProcessor {
     
     public func process(instruction: String,responder: AsyncStream<Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse>.Continuation) async {
         _ = await withThrowingTaskGroup(of: (String,String).self) { group in
+            log.info("Starting bundle processing for \(instruction)")
+            var count: Int = 0
             do {
                 for step in self.steps {
+                    log.info("Starting Task \(step.transformId)")
                     let context = SerializableFnBundleContext(instruction: instruction, transform: step.transformId, payload: step.payload, log: self.log)
                     group.addTask {
                         return try await step.fn.process(context: context, inputs: step.inputs, outputs: step.outputs)
                     }
+                    count += 1
                 }
+                var finished: Int = 0
                 for try await (instruction,transform) in group {
-                    log.info("Task Completed (\(instruction),\(transform))")
+                    finished += 1
+                    log.info("Task Completed (\(instruction),\(transform)) \(finished) of \(count)")
                 }
+                log.info("All tasks completed for \(instruction)")
                 responder.yield(.with {
                     $0.instructionID = instruction
                 })
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
index 19ad99c7d99..620a6758261 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
@@ -39,6 +39,7 @@ final class Sink : SerializableFn {
         }
         emitter.yield(.last(context.instruction, context.transform))
         emitter.finish()
+        await client.finalizeStream(instruction: context.instruction, transform: context.transform)
         return (context.instruction,context.transform)
     }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift
index fd2888ad027..90d28057f48 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift
@@ -17,25 +17,32 @@
  */
 
 import Foundation
+import Logging
 
 /// Custom SerializableFn that reads/writes from an external data stream using a defined coder. It assumes that a given
 /// data element might contain more than one coder
 final class Source : SerializableFn {
-
+    
     let client: DataplaneClient
     let coder: Coder
+    let log: Logger
     
     public init(client: DataplaneClient,coder:Coder) {
         self.client = client
         self.coder = coder
-
+        self.log = Logger(label:"Source")
     }
     
     
     func process(context: SerializableFnBundleContext,
                  inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) {
+        log.info("Waiting for input on \(context.instruction)-\(context.transform)")
         let (stream,_) = await client.makeStream(instruction: context.instruction, transform: context.transform)
+        
+        var messages: Int = 0
+        var count: Int = 0
         for await message in stream {
+            messages += 1
             switch message {
             case let .data(data):
                 var d = data
@@ -43,6 +50,7 @@ final class Source : SerializableFn {
                     let value = try coder.decode(&d)
                     for output in outputs {
                         try output.emit(value: value)
+                        count += 1
                     }
                 }
             case let .last(id, transform):
@@ -50,9 +58,11 @@ final class Source : SerializableFn {
                     output.finish()
                 }
                 await client.finalizeStream(instruction: id, transform: transform)
+                log.info("Source \(context.instruction),\(context.transform) handled \(count) items over \(messages) messages")
                 return (id,transform)
             //TODO: Handle timer messages
             default:
+                log.info("Unhanled message \(message)")
                 break
             }
         }
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift b/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift
index da820e0675b..254842e2d9d 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift
@@ -67,7 +67,7 @@ actor DataplaneClient {
     private var streams: [Pair:(Stream,Stream.Continuation,MultiplexContinuation)] = [:]
     private let flush: Int
 
-    public init(id:String,endpoint:ApiServiceDescriptor,flush:Int=100) throws {
+    public init(id:String,endpoint:ApiServiceDescriptor,flush:Int=1000) throws {
         self.id  = id
         self.log = Logging.Logger(label: "Dataplane(\(id),\(endpoint.url))")
         self.multiplex = AsyncStream.makeStream(of:Multiplex.self)
@@ -80,7 +80,9 @@ actor DataplaneClient {
             log.info("Initiating data plane multiplexing.")
             
             let input = multiplex.0
-
+            var count: Int = 0
+            var flushes: Int = 0
+            
             var elements = Org_Apache_Beam_Model_FnExecution_V1_Elements()
             for try await element in input {
                 var shouldFlush: Bool = false
@@ -92,6 +94,7 @@ actor DataplaneClient {
                         $0.transformID = element.transform
                         $0.data = payload
                     })
+                    count += 1
                 case let .timer(family, payload):
                     elements.timers.append(.with {
                         $0.instructionID = element.id
@@ -99,6 +102,7 @@ actor DataplaneClient {
                         $0.timerFamilyID = family
                         $0.timers = payload
                     })
+                    count += 1
                 case let .last(id, transform):
                     elements.data.append(.with {
                         $0.instructionID = id
@@ -106,19 +110,36 @@ actor DataplaneClient {
                         $0.isLast = true
                     })
                     shouldFlush = true
+                    count += 1
                 case .flush:
                     shouldFlush = true
                 }
                 if shouldFlush || elements.data.count + elements.timers.count >= flush {
                     do {
+                        if case .last = element.message {
+                            log.info("Got last message, flushing \(elements.data.count + elements.timers.count) elements to data plane")
+                        }
                         try await stream.requestStream.send(elements)
                     } catch {
                         log.error("Unable to multiplex elements onto data plane: \(error)")
                     }
                     elements = Org_Apache_Beam_Model_FnExecution_V1_Elements()
                     shouldFlush = false
+                    flushes += 1
+                }
+                if count % 50000 == 0 && count > 0 {
+                    log.info("Processed \(count) elements (\(flushes) flushes)")
                 }
             }
+            if(elements.data.count + elements.timers.count > 0) {
+                do {
+                    log.info("Flushing final elements to data plane.")
+                    try await stream.requestStream.send(elements)
+                } catch {
+                    log.error("Unable to multiplex final elements onto data plane: \(error)")
+                }
+            }
+            log.info("Shutting down dataplane multiplexing")
         }
 
         // Demux task
@@ -132,7 +153,6 @@ actor DataplaneClient {
                     
                     for element in elements.data {
                         let key = Pair(id: element.instructionID, transform: element.transformID)
-                        //Drop zero-length elements
                         if element.data.count > 0 {
                             messages[key,default:[]].append(.data(element.data))
                         }
@@ -159,15 +179,20 @@ actor DataplaneClient {
                         }
                     }
                     // Send any last messages
+                    // TODO: Fix known race here. We try to re-use streams across bundles which can lead to a race where yield is sent too early.
                     for (key,value) in last {
                         let output = await self.makeStream(key: key).1
                         output.yield(value)
                     }
                 }
             } catch {
-                log.error("Lost data plane connection.")
+                log.error("Lost data plane connection. Closing all outstanding streams")
+                for (id,stream) in await streams {
+                    log.info("Closing stream \(id)")
+                    stream.1.finish()
+                }
             }
-            
+            multiplex.1.finish()
         }
     }
     
@@ -192,7 +217,11 @@ actor DataplaneClient {
     }
     
     func finalizeStream(instruction:String,transform:String) {
-        //TODO: Implement finalization.
+        let key = Pair(id:instruction,transform:transform)
+        log.info("Done with stream \(key)")
+        if let element = streams.removeValue(forKey: key) {
+            element.1.finish()
+        }
     }
 
     
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift
index 08204a11184..1a19f930f2c 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift
@@ -52,6 +52,7 @@ actor Worker {
         //Start the response task. This will continue until a yield call is sent from responder
         Task {
             for await r in responses {
+                log.info("Sending response \(r)")
                 try await control.requestStream.send(r)
             }
         }
@@ -75,8 +76,14 @@ actor Worker {
             for try await instruction in control.responseStream {
                 switch instruction.request {
                 case .processBundle(let pbr):
-                    try await processor(for:pbr.processBundleDescriptorID)
-                        .process(instruction: instruction.instructionID,responder:responder)
+                    do {
+                        let p = try await processor(for:pbr.processBundleDescriptorID)
+                        Task {
+                            await p.process(instruction: instruction.instructionID,responder:responder)
+                        }
+                    } catch {
+                        log.error("Unable to process bundle \(pbr.processBundleDescriptorID): \(error)")
+                    }
                     break
                 default:
                     log.warning("Ignoring instruction \(instruction.instructionID). Not yet implemented.")
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift
index e29ea085b7d..7febd84895a 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift
@@ -18,87 +18,34 @@
 
 import Foundation
 
-public protocol PInput<Of> {
-    associatedtype Of
-    
-    var value: Of { get }
-    var timestamp: Date { get }
-    var window: Window { get }
-}
-
-public protocol POutput<Of> {
-    associatedtype Of
-    
-    func emit(_ value: Of,timestamp: Date,window: Window)
-    func emit(_ value: Of)
-    func emit(_ value: Of,timestamp: Date)
-    func emit(_ value: Of,window: Window)
-}
-
-
-struct PardoInput<Of> : PInput {
-    let value: Of
-    let timestamp: Date
-    let window: Window
-    public init(_ value:(Of,Date,Window)) {
-        self.value = value.0
-        self.timestamp = value.1
-        self.window = value.2
-    }
-}
-
-struct PardoOutput<Of> : POutput {
-    
-    
-    let stream: PCollectionStream<Of>
-    let timestamp: Date
-    let window: Window
-
-    func emit(_ value: Of, timestamp: Date, window: Window) {
-        stream.emit(value,timestamp:timestamp,window:window)
-    }
-    func emit(_ value: Of, timestamp: Date) {
-        stream.emit(value,timestamp:timestamp,window:window)
-
-    }    
-    func emit(_ value: Of, window: Window) {
-        stream.emit(value,timestamp:timestamp,window:window)
-
-    }
-    func emit(_ value: Of) {
-        stream.emit(value,timestamp:timestamp,window:window)
-    }
-
-    
-}
-
 public extension PCollection {
     
     // No Output
-    func pardo(name:String,_ fn: @Sendable @escaping (any PInput<Of>) async throws -> Void) {
+    func pardo(name:String,_ fn: @Sendable @escaping (PInput<Of>) async throws -> Void) {
         pstream(name:name) { input in
             for try await element in input {
-                try await fn(PardoInput(element))
+                try await fn(PInput(element))
             }
         }
     }
     
     // One Output
-    func pardo<O0>(name:String,_ fn: @Sendable @escaping (any PInput<Of>,any POutput<O0>) async throws -> Void) -> PCollection<O0> {
+    func pardo<O0>(name:String,_ fn: @Sendable @escaping (PInput<Of>,POutput<O0>) async throws -> Void) -> PCollection<O0> {
         pstream(name:name) { input,output in
             for try await element in input {
-                try await fn(PardoInput(element),PardoOutput(stream:output,timestamp:element.1,window:element.2))
+                try await fn(PInput(element),
+                             POutput(stream:output,timestamp:element.1,window:element.2))
             }
         }
     }
     
     //Two Outputs
-    func pardo<O0,O1>(name:String,_ fn: @Sendable @escaping (any PInput<Of>,any POutput<O0>,any POutput<O1>) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
+    func pardo<O0,O1>(name:String,_ fn: @Sendable @escaping (PInput<Of>,POutput<O0>,POutput<O1>) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
         pstream(name:name) { input,o0,o1 in
             for try await element in input {
-                try await fn(PardoInput(element),
-                             PardoOutput(stream:o0,timestamp:element.1,window:element.2),
-                             PardoOutput(stream:o1,timestamp:element.1,window:element.2)
+                try await fn(PInput(element),
+                             POutput(stream:o0,timestamp:element.1,window:element.2),
+                             POutput(stream:o1,timestamp:element.1,window:element.2)
                              
                 )
             }
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
index 7abd847b8b5..a03f3e6ec11 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
@@ -25,7 +25,8 @@ public extension PCollection {
                 for v in kv.values {
                     accumulator(v,&result)
                 }
-                output.emit(KV(kv.key,result),timestamp:ts,window:w)
+                let intermediate = KV(kv.key,result)
+                output.emit(intermediate,timestamp:ts,window:w)
             }
                 
         }
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
index b4ffa125b08..142f264bd99 100644
--- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
@@ -30,8 +30,8 @@ final class FileIOTests: XCTestCase {
     }
 
     func testGoogleStorageReadFiles() async throws {
+        throw XCTSkip()
         try await PCollectionTest(PCollection<KV<String,String>>().readFiles(in: GoogleStorage.self)) { log,inputs,outputs in
-            throw XCTSkip()
             log.info("Sending value")
             try inputs[0].emit(value:KV("dataflow-samples","shakespeare/asyoulikeit.txt"))
             log.info("Value sent")
@@ -43,11 +43,13 @@ final class FileIOTests: XCTestCase {
     }
 
     func testShakespeareWordcount() async throws {
+        //throw XCTSkip()
         try await Pipeline { pipeline in
             let contents = pipeline
                 .create(["dataflow-samples/shakespeare"])
                 .map({ value in
                     let parts = value.split(separator: "/",maxSplits: 1)
+                    print("Got filename \(parts) from \(value)")
                     return KV(parts[0].lowercased(),parts[1].lowercased())
                 })
                 .listFiles(in: GoogleStorage.self)
@@ -64,10 +66,9 @@ final class FileIOTests: XCTestCase {
             
             // Our first group by operation
             let baseCount = lines
-                .flatMap({ $0.components(separatedBy: .whitespaces) })
+                .flatMap({ (line:String) in line.components(separatedBy: .whitespaces) })
                 .groupBy({ ($0,1) })
                 .sum()
-                .log(prefix:"INTERMEDIATE OUTPUT")
             
             let normalizedCounts = baseCount.groupBy {
                 ($0.key.lowercased().trimmingCharacters(in: .punctuationCharacters),
@@ -76,7 +77,7 @@ final class FileIOTests: XCTestCase {
             
             normalizedCounts.log(prefix:"COUNT OUTPUT")
             
-        }.run(PortableRunner(loopback:true))
+        }.run(PortableRunner(port:8099,loopback:true))
     }
     
 
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
index 27e5972c1e7..0ea5d962d92 100644
--- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
@@ -83,7 +83,7 @@ final class IntegrationTests: XCTestCase {
             normalizedCounts.log(prefix:"COUNT OUTPUT")
             errors.log(prefix:"ERROR OUTPUT")
             
-        }.run(PortableRunner(loopback:true)) 
+        }.run(PortableRunner(port:8099,loopback:true)) 
         
         
     }