You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/08/28 17:47:46 UTC

[beam] 03/22: Refactor Unary and Tuple SerializableFn into a single ClosureFn since one is just a special case of the other. Add the wordcount integration test to verify the refactoring does indeed work and start moving the pipeline code into the branch.

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch swift-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5be3750fd8f5294f98867c44c8339dcd192837d8
Author: Byron Ellis <by...@google.com>
AuthorDate: Mon Aug 14 07:49:04 2023 -0700

    Refactor Unary and Tuple SerializableFn into a single ClosureFn since one is just a special case of the other. Add the wordcount integration test to verify the refactoring does indeed work and start moving the pipeline code into the branch.
---
 sdks/python/output.json-00000-of-00001             |  2 +
 sdks/swift/Documentation/INTERNALS.md              |  4 -
 .../Sources/ApacheBeam/Coders/BeamValue.swift      |  2 +
 .../swift/Sources/ApacheBeam/Coders/Beamable.swift | 24 ++++++
 .../Sources/ApacheBeam/Coders/Coder+Decoding.swift |  1 +
 sdks/swift/Sources/ApacheBeam/Coders/Coder.swift   | 31 ++++++-
 .../ApacheBeam/Core/DynamicProperties.swift        | 36 ++++++++
 .../Sources/ApacheBeam/Core/Fn/ClosureFn.swift     | 63 ++++++++++++++
 sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift   | 15 ++++
 sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift | 63 --------------
 .../Core/Fn/ParameterizedClosureFn.swift           | 74 ++++++++++++++++
 .../{SerialiableFn.swift => SerializableFn.swift}  |  4 +-
 .../Core/PCollection/AnyPCollection.swift          | 51 +++++++++++
 .../Core/PCollection/AnyPCollectionStream.swift    | 17 ++--
 .../ApacheBeam/Core/PCollection/PCollection.swift  | 32 +++++++
 .../Core/PCollection/PCollectionStream.swift       | 73 ++++++++++++++--
 .../ApacheBeam/Core/PTransform/AnyPTransform.swift | 25 ++++++
 .../ApacheBeam/Core/PTransform/PTransform.swift    | 33 ++++++++
 .../ApacheBeam/Core/Pipeline/Pipeline.swift        | 14 ++++
 .../Core/Pipeline/PipelineTransform.swift          | 16 ++++
 .../ApacheBeam/Runtime/Worker/WorkerProvider.swift |  2 +-
 .../Sources/ApacheBeam/Transforms/Basic.swift      | 68 +++++++++++++++
 .../Sources/ApacheBeam/Transforms/BuiltIn.swift    | 98 ++++++++++++++++++++++
 .../Sources/ApacheBeam/Transforms/Combining.swift  | 22 +++++
 .../Sources/ApacheBeam/Transforms/Grouping.swift   |  8 ++
 .../Documentation.docc/ApacheBeam.md               | 37 ++++++++
 .../Tests/ApacheBeamTests/Coders/CoderTests.swift  |  9 ++
 .../Pipeline/IntegrationTests.swift                | 76 +++++++++++++++++
 28 files changed, 815 insertions(+), 85 deletions(-)

diff --git a/sdks/python/output.json-00000-of-00001 b/sdks/python/output.json-00000-of-00001
new file mode 100644
index 00000000000..8b56471c52f
--- /dev/null
+++ b/sdks/python/output.json-00000-of-00001
@@ -0,0 +1,2 @@
+{"col1":"bar","col2":2,"col3":200}
+{"col1":"baz","col2":3,"col3":300}
diff --git a/sdks/swift/Documentation/INTERNALS.md b/sdks/swift/Documentation/INTERNALS.md
deleted file mode 100644
index 757e1b44602..00000000000
--- a/sdks/swift/Documentation/INTERNALS.md
+++ /dev/null
@@ -1,4 +0,0 @@
-#  Internals
-
-
-
diff --git a/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift b/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift
index 054356ed6f1..b7ed89c4668 100644
--- a/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift
+++ b/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift
@@ -37,6 +37,8 @@ public indirect enum BeamValue {
     /// A window
     case window(Window)
     
+    //TODO: Custom Values and Row Values
+    
     // Composite Values
     
     /// An iterable
diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Beamable.swift b/sdks/swift/Sources/ApacheBeam/Coders/Beamable.swift
new file mode 100644
index 00000000000..239c248104c
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Coders/Beamable.swift
@@ -0,0 +1,24 @@
+
+import Foundation
+
+/// Codable is already taken and besides Beamable is too good to pass up
+public protocol Beamable {
+    static var coder: Coder { get }
+}
+
+extension Data : Beamable {
+    public static let coder:Coder = .bytes
+}
+
+extension String : Beamable {
+    public static let coder:Coder = .string
+}
+
+extension Int : Beamable {
+    public static let coder:Coder = .varint
+}
+
+extension Bool : Beamable {
+    public static let coder:Coder = .boolean
+}
+
diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
index 48c1cb9db43..31b97f0d318 100644
--- a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
+++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
@@ -17,6 +17,7 @@
  */
 import Foundation
 
+/// This extension contains all of the decoding implementation. File separation is for clarity.
 public extension Coder {
     
     /// Decodes a raw data block into a BeamValue for further processing
diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift
index e8c23b9d4cf..ceb2649243e 100644
--- a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift
+++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift
@@ -18,20 +18,21 @@
 import Foundation
 
 public indirect enum Coder {
-    // Special
+    /// Catch-all for coders we don't understand. Mostly used for error reporting
     case unknown(String)
+    // TODO: Actually implement this
     case custom(Data)
     
-    // Scalar standard coders
+    /// Standard scalar coders. Does not necessarily correspond 1:1 with BeamValue. For example, varint and fixedint both map to integer
     case double,varint,fixedint,byte,bytes,string,boolean,globalwindow
     
-    // Composite standard coders
-    
+    /// Composite coders.
     case keyvalue(Coder,Coder)
     case iterable(Coder)
     case lengthprefix(Coder)
     case windowedvalue(Coder,Coder)
     
+
     // TODO: Row Coder
 }
 
@@ -71,6 +72,7 @@ public extension Coder {
         }
     }
     
+    /// Static list of coders for use in capabilities arrays in environments.
     static let capabilities:[String] = ["byte","bytes","bool","varint","double","integer","string_utf8","length_prefix","kv","iterable","windowed_value","global_window"]
         .map({ .coderUrn($0) })
 }
@@ -187,4 +189,25 @@ extension Coder {
     }
 }
 
+extension Coder {
+    
+    
+    
+    public static func of<Of>(type: Optional<Of>.Type) -> Coder? {
+        return .lengthprefix(.of(type: Of.self)!)
+    }
+    
+    public static func of<Of>(type: Array<Of>.Type) -> Coder? {
+        return .iterable(.of(type: Of.self)!)
+    }
+    
+    public static func of<Of>(type: Of.Type) -> Coder? {
+        // Beamables provider their own default coder implementation
+        if let beamable  = Of.self as? Beamable.Type {
+            return beamable.coder
+        }
+        return nil
+    }
+}
+
 
diff --git a/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift b/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift
new file mode 100644
index 00000000000..ee3c61a65c5
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift
@@ -0,0 +1,36 @@
+import Logging
+
+public protocol DynamicProperty { }
+
+@propertyWrapper
+public struct PInput<Of> : DynamicProperty {
+    public var wrappedValue: PCollectionStream<Of>
+    
+    public init(wrappedValue: PCollectionStream<Of> = .init()) {
+        self.wrappedValue = wrappedValue
+    }
+}
+
+@propertyWrapper
+public struct POutput<Of> : DynamicProperty {
+    public var wrappedValue: PCollectionStream<Of>
+    public init(wrappedValue: PCollectionStream<Of> = .init()) {
+        self.wrappedValue = wrappedValue
+    }
+}
+
+@propertyWrapper
+public struct Logger : DynamicProperty {
+    public var wrappedValue: Logging.Logger
+    public init(wrappedValue: Logging.Logger = Logging.Logger(label: "TEST")) {
+        self.wrappedValue = wrappedValue
+    }
+}
+
+@propertyWrapper
+public struct Serialized<Value:Codable> : DynamicProperty {
+    public var wrappedValue: Value?
+    public init(wrappedValue: Value? = nil) {
+        self.wrappedValue = wrappedValue
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift
new file mode 100644
index 00000000000..fcf43a91215
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift
@@ -0,0 +1,63 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import Foundation
+
+/// A SerializableFn that holds a reference to a function that takes a single input and produces a variable number of outputs
+public final class ClosureFn : SerializableFn {
+    let processClosure: (SerializableFnBundleContext,[AnyPCollectionStream],[AnyPCollectionStream]) async throws -> Void
+
+    //TODO: Replace this with a parameter pack version once I figure out how to do that
+
+    public init<Of>(_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) {
+        self.processClosure = { context,inputs,outputs in
+            try await fn(inputs[0].stream())
+        }
+    }
+    
+    public init<Of,O0>(_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) {
+        self.processClosure = { context,inputs,outputs in
+            try await fn(inputs[0].stream(),outputs[0].stream())
+        }
+    }
+
+    public init<Of,O0,O1>(_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) {
+        self.processClosure = { context,inputs,outputs in
+            try await fn(inputs[0].stream(),outputs[0].stream(),outputs[1].stream())
+        }
+    }
+    
+    public init<Of,O0,O1,O2>(_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) {
+        self.processClosure = { context,inputs,outputs in
+            try await fn(inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream())
+        }
+    }
+
+    public init<Of,O0,O1,O2,O3>(_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream,PCollection<O3>.Stream) async throws -> Void) {
+        self.processClosure = { context,inputs,outputs in
+            try await fn(inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream(),outputs[3].stream())
+        }
+    }
+    
+    public func process(context: SerializableFnBundleContext, inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) {
+        try await processClosure(context,inputs,outputs)
+        outputs.finish()
+        return (context.instruction,context.transform)
+    }
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift
new file mode 100644
index 00000000000..58d1c211c81
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift
@@ -0,0 +1,15 @@
+
+import Logging
+
+/// A higher level interface to SerializableFn using dependency injected dynamic properties in the same
+/// way as we define Composite PTransforms
+public protocol DoFn {
+    func process() async throws
+    func finishBundle() async throws
+}
+
+public extension DoFn {
+    func finishBundle() async throws { }
+}
+
+
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift
deleted file mode 100644
index 82fd4708046..00000000000
--- a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-import Foundation
-
-public final class DoneFn<Of> : SerializableFn {
-
-    private let fn: (PCollectionStream<Of>) async throws -> Void
-    public init(_ fn: @Sendable @escaping (PCollectionStream<Of>) async throws -> Void) {
-        self.fn = fn
-    }
-    
-    
-    public func process(context: SerializableFnBundleContext, inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) {
-        try await fn(inputs[0].stream())
-        for output in outputs {
-            output.finish()
-        }
-        return (context.instruction,context.transform)
-    }
-}
-
-public final class ParameterizedDoneFn<Of,Param:Codable> : SerializableFn {
-    
-    
-    private let param: Param
-    private let fn: (Param,PCollectionStream<Of>) async throws -> Void
-    
-    public init(_ param: Param,_ fn: @Sendable @escaping (Param,PCollectionStream<Of>) async throws -> Void){
-        self.param = param
-        self.fn = fn
-    }
-    
-    public var payload: Data {
-        get throws {
-            try JSONEncoder().encode(param)
-        }
-    }
-    
-    public func process(context: SerializableFnBundleContext, inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) {
-        try await fn(try JSONDecoder().decode(Param.self, from: context.payload),inputs[0].stream())
-        for output in outputs {
-            output.finish()
-        }
-        return (context.instruction,context.transform)
-    }
-
-}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/ParameterizedClosureFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/ParameterizedClosureFn.swift
new file mode 100644
index 00000000000..575886d9f8c
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/ParameterizedClosureFn.swift
@@ -0,0 +1,74 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import Foundation
+
+/// A SerializableFn that holds a reference to a function that takes a single input and produces a variable number of outputs
+public final class ParameterizedClosureFn<Param:Codable> : SerializableFn {
+    let param: Param
+    let processClosure: (SerializableFnBundleContext,[AnyPCollectionStream],[AnyPCollectionStream]) async throws -> Void
+
+    //TODO: Replace this with a parameter pack version once I figure out how to do that
+
+    public init<Of>(_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) {
+        self.param = param
+        self.processClosure = { context,inputs,outputs in
+            try await fn(try JSONDecoder().decode(Param.self, from: context.payload),inputs[0].stream())
+        }
+    }
+    
+    public init<Of,O0>(_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) {
+        self.param = param
+        self.processClosure = { context,inputs,outputs in
+            try await fn(try JSONDecoder().decode(Param.self, from: context.payload),inputs[0].stream(),outputs[0].stream())
+        }
+    }
+
+    public init<Of,O0,O1>(_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) {
+        self.param = param
+        self.processClosure = { context,inputs,outputs in
+            try await fn(try JSONDecoder().decode(Param.self, from: context.payload),inputs[0].stream(),outputs[0].stream(),outputs[1].stream())
+        }
+    }
+    
+    public init<Of,O0,O1,O2>(_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) {
+        self.param = param
+        self.processClosure = { context,inputs,outputs in
+            try await fn(try JSONDecoder().decode(Param.self, from: context.payload),inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream())
+        }
+    }
+
+    public init<Of,O0,O1,O2,O3>(_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream,PCollection<O3>.Stream) async throws -> Void) {
+        self.param = param
+        self.processClosure = { context,inputs,outputs in
+            try await fn(try JSONDecoder().decode(Param.self, from: context.payload),inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream(),outputs[3].stream())
+        }
+    }
+    
+    public func process(context: SerializableFnBundleContext, inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) {
+        try await processClosure(context,inputs,outputs)
+        outputs.finish()
+        return (context.instruction,context.transform)
+    }
+    
+    public var payload: Data {
+        get throws {
+            try JSONEncoder().encode(param)
+        }
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
similarity index 86%
rename from sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift
rename to sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
index 3a30066dc4b..babf64dfe31 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
@@ -25,7 +25,8 @@ public struct SerializableFnBundleContext {
     let log:Logger
 }
 
-/// SerialiableFn is a protocol for functions that should be parameterized for the pipeline
+/// SerialiableFn is a protocol for functions that should be parameterized for the pipeline. This is intended as a fairly low level class and users
+/// should interact with the apply() functions defined in the transform section or implement the DoFn protocol which is then wrapped
 public protocol SerializableFn {
     var  payload: Data { get throws }
     func process(context:SerializableFnBundleContext,inputs:[AnyPCollectionStream],outputs:[AnyPCollectionStream]) async throws -> (String,String)
@@ -35,4 +36,3 @@ public protocol SerializableFn {
 public extension SerializableFn {
     var payload : Data { Data() }
 }
-
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
new file mode 100644
index 00000000000..8f23dccb954
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
@@ -0,0 +1,51 @@
+public struct AnyPCollection : PCollectionProtocol {
+    
+    
+    
+    let type: Any.Type
+    let ofType: Any.Type
+    let collection: Any
+    
+    let applyClosure: (Any,PipelineTransform) -> Void
+    let consumersClosure: (Any) -> [PipelineTransform]
+    let coderClosure: (Any) -> Coder
+    let streamClosure: (Any) -> AnyPCollectionStream
+    
+    public init<C>(_ collection: C) where C : PCollectionProtocol {
+        if let anyCollection = collection as? AnyPCollection {
+            self = anyCollection
+        } else {
+            self.type = C.self
+            self.ofType = C.Of.self
+            self.collection = collection
+            
+            self.applyClosure = { ($0 as! C).apply($1) }
+            self.consumersClosure = { ($0 as! C).consumers }
+            self.coderClosure = { ($0 as! C).coder }
+            self.streamClosure = { AnyPCollectionStream(($0 as! C).stream) }
+        }
+    }
+    
+    
+    public var consumers: [PipelineTransform] {
+        consumersClosure(collection)
+    }
+    
+    public func apply(_ transform: PipelineTransform) {
+        applyClosure(collection,transform)
+    }
+
+    
+    public var coder: Coder {
+        coderClosure(collection)
+    }
+
+    public var stream: PCollectionStream<Never> {
+        fatalError("Do not use `stream` on AnyPCollection. Use `anyStream` instead.")
+    }
+    
+    public var anyStream: AnyPCollectionStream {
+        streamClosure(collection)
+    }
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
index da15c740771..df11ecd3780 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
@@ -33,7 +33,7 @@ public struct AnyPCollectionStream : AsyncSequence {
     
     let value: Any
     let nextGenerator: (Any) -> (() async throws -> Iterator.Element?)
-    let emitClosure: (Any,Any) -> Void
+    let emitClosure: (Any,Any) throws -> Void
     let finishClosure: (Any) -> Void
     
     public func makeAsyncIterator() -> Iterator {
@@ -50,12 +50,12 @@ public struct AnyPCollectionStream : AsyncSequence {
         self.emitClosure = {
             let stream = ($0 as! PCollectionStream<Of>)
             if let beamValue = $1 as? BeamValue {
-                stream.emit(beamValue)
+                try stream.emit(beamValue)
             } else if let element = $1 as? Element {
                 stream.emit((element.0 as! Of,element.1,element.2))
             } else if let element = $1 as? PCollectionStream<Of>.Element {
                 stream.emit(element)
-            }
+            } 
         }
         
         self.finishClosure = {
@@ -81,6 +81,13 @@ public struct AnyPCollectionStream : AsyncSequence {
     public func finish() {
         finishClosure(self.value)
     }
-    
-    
+}
+
+/// Convenience function of an array of AnyPCollectionStream elements to finish processing.
+public extension Array where Array.Element == AnyPCollectionStream {
+    func finish() {
+        for stream in self {
+            stream.finish()
+        }
+    }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift
new file mode 100644
index 00000000000..5fb52f7e7ea
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift
@@ -0,0 +1,32 @@
+public protocol PCollectionProtocol {
+    associatedtype Of
+    
+    typealias Stream = PCollectionStream<Of>
+    
+    var consumers: [PipelineTransform] { get }
+    var coder: Coder { get }
+    var stream: Stream { get }
+    
+    func apply(_ transform: PipelineTransform)
+}
+
+
+public final class PCollection<Of> : PCollectionProtocol {
+
+    public let coder: Coder
+    public var consumers: [PipelineTransform]
+    
+    public init(coder: Coder = .of(type: Of.self)!,consumers:[PipelineTransform] = []) {
+        self.coder = coder
+        self.consumers = consumers
+    }
+
+    public var stream: PCollectionStream<Of> {
+        return PCollectionStream<Of>()
+    }
+    
+    public func apply(_ transform: PipelineTransform) {
+        consumers.append(transform)
+    }
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
index d54e2638230..1d2b3cbe151 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
@@ -26,10 +26,7 @@ public final class PCollectionStream<Of> : AsyncSequence {
     private let emitter: AsyncStream<Element>.Continuation
     
     public init() {
-        //Construct a stream, capturing the emit continuation
-        var tmp: AsyncStream<Element>.Continuation?
-        self.stream = AsyncStream<Element> { tmp = $0 }
-        self.emitter = tmp!
+        (self.stream,self.emitter) = AsyncStream.makeStream(of:Element.self)
     }
     
     public func makeAsyncIterator() -> AsyncStream<Element>.Iterator {
@@ -49,8 +46,72 @@ public final class PCollectionStream<Of> : AsyncSequence {
         emit((value,timestamp,window))
     }
     
-    public func emit(_ value: BeamValue) {
-        
+    // Implementing key-value pair conversion is a little more complicated because we need
+    // to convert to a KV<K,V> from what is essentially a KV<Any,Any> which requires us to
+    // cast the key and the value first and then construct the KV from that. There might be
+    // a more clever way of doing this, but I don't know what it is.
+    
+    func emit<K,V>(key: K,value: [V],timestamp: Date,window:Window) {
+        emit(KV(key,value) as! Of,timestamp:timestamp,window:window)
+    }
+    
+    func emit<K>(key: K,value: BeamValue,timestamp: Date,window:Window) throws {
+        //We overload the key value type as both (K,[V]) and (K,V). It may be worth considering
+        //having an explicit Pair type in addition to KV to simplify this decoding a little bit.
+        //
+        // On the other hand, the code is already written and pretty straightforward and there
+        // won't be much in the way of new scalar values.
+        if case .array(let array) = value {
+            switch array.first {
+            case .boolean(_):emit(key:key,value:array.map({$0.baseValue as! Bool}),timestamp:timestamp,window:window)
+            case .bytes(_):  emit(key:key,value:array.map({$0.baseValue as! Data}),timestamp:timestamp,window:window)
+            case .double(_): emit(key:key,value:array.map({$0.baseValue as! Double}),timestamp:timestamp,window:window)
+            case .integer(_):emit(key:key,value:array.map({$0.baseValue as! Int}),timestamp:timestamp,window:window)
+            case .string(_): emit(key:key,value:array.map({$0.baseValue as! String}),timestamp:timestamp,window:window)
+            default:
+                throw ApacheBeamError.runtimeError("Can't use \(String(describing:array.first)) as a value in a key value pair")
+            }
+        } else {
+            switch value {
+            case let .boolean(v):emit(key:key,value:[v],timestamp:timestamp,window:window)
+            case let .bytes(v):  emit(key:key,value:[v],timestamp:timestamp,window:window)
+            case let .double(v): emit(key:key,value:[v],timestamp:timestamp,window:window)
+            case let .integer(v):emit(key:key,value:[v],timestamp:timestamp,window:window)
+            case let .string(v): emit(key:key,value:[v],timestamp:timestamp,window:window)
+            default:
+                throw ApacheBeamError.runtimeError("Can't use \(value) as a value in a key value pair")
+            }
+
+        }
+    }
+    
+    // Unwrap all of the actual value types (not windows or windowed elements)
+    func emit(_ value: BeamValue,timestamp: Date,window:Window) throws {
+        if case let .kv(key,value) = value {
+            // Unwrap the key first
+            switch key {
+            case let .boolean(v):try emit(key:v,value:value,timestamp:timestamp,window:window)
+            case let .bytes(v):  try emit(key:v,value:value,timestamp:timestamp,window:window)
+            case let .double(v): try emit(key:v,value:value,timestamp:timestamp,window:window)
+            case let .integer(v):try emit(key:v,value:value,timestamp:timestamp,window:window)
+            case let .string(v): try emit(key:v,value:value,timestamp:timestamp,window:window)
+            default:
+                throw ApacheBeamError.runtimeError("Can't use \(value) as a value in a key value pair")
+            }
+        } else {
+            emit(value.baseValue as! Of,timestamp: timestamp,window: window)
+        }
+    }
+    
+    /// Mostly intended as a convenience function for bundle processing this emit unwraps a windowed value
+    /// for further conversion in (non-public) versions of the function.
+    public func emit(_ value: BeamValue) throws {
+        switch value {
+        case let .windowed(value, timestamp, _, window):
+            try emit(value,timestamp:timestamp,window:window.baseValue as! Window)
+        default:
+            throw ApacheBeamError.runtimeError("Only windowed values can be sent directly to a PCollectionStream, not \(value)")
+        }
     }
     
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/AnyPTransform.swift b/sdks/swift/Sources/ApacheBeam/Core/PTransform/AnyPTransform.swift
new file mode 100644
index 00000000000..0be179c8cdf
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/AnyPTransform.swift
@@ -0,0 +1,25 @@
+public struct AnyPTransform : _PrimitivePTransform {
+    let type: Any.Type
+    var transform: Any
+    
+    let expandClosure: (Any) -> AnyPTransform
+    let expansionType: Any.Type
+    
+    
+    public init<T>(_ transform: T) where T: PTransform {
+        if let anyTransform = transform as? AnyPTransform {
+            self = anyTransform
+        } else {
+            self.type = T.self
+            self.expansionType = T.Expansion.self
+            self.transform = transform
+            self.expandClosure = { AnyPTransform(($0 as! T).expand) }
+        }
+    }
+}
+
+extension AnyPTransform : ParentPTransform {
+    public var children: [AnyPTransform] {
+        (transform as? ParentPTransform)?.children ?? []
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift
new file mode 100644
index 00000000000..483a784c420
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift
@@ -0,0 +1,33 @@
+/// Represents a composite transform
+public protocol PTransform {
+    associatedtype Expansion: PTransform
+    
+    var expand: Expansion { get }
+}
+
+public extension Never {
+    var expand: Never {
+        fatalError()
+    }
+}
+
+extension Never : PTransform { }
+
+/// Represents PTransforms that can't be expanded further. When constructing the pipeline the expansion
+/// happens until we hit this point
+public protocol _PrimitivePTransform : PTransform where Expansion == Never { }
+public extension _PrimitivePTransform {
+    var expand: Never {
+        neverExpand(String(reflecting: Self.self)) }
+}
+
+
+public protocol ParentPTransform {
+    var children: [AnyPTransform] { get }
+}
+
+protocol GroupPTransform : ParentPTransform { }
+
+public func neverExpand(_ type: String) -> Never {
+    fatalError("\(type) is a primitive PTransform and cannot be expanded.")
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
new file mode 100644
index 00000000000..f918a620ea1
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
@@ -0,0 +1,14 @@
+import GRPC
+import Logging
+
+public final class Pipeline {
+    let content: (inout PCollection<Never>) -> Void
+    let log: Logging.Logger
+    
+    public init(log: Logging.Logger = .init(label:"Pipeline"),_ content: @escaping (inout PCollection<Never>) -> Void) {
+        self.log = log
+        self.content = content
+    }
+    
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
new file mode 100644
index 00000000000..dea7daab738
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
@@ -0,0 +1,16 @@
+
+import Foundation
+
+/// Enum for pipeline representable transforms as opposed to composite transforms
+/// which are a user-side construct represented by PTransform
+public enum PipelineTransform {
+    case pardo(String,SerializableFn,[AnyPCollection])
+    case impulse(AnyPCollection)
+    case flatten([AnyPCollection],AnyPCollection)
+    case groupByKey(AnyPCollection)
+    case custom(String,Data,[AnyPCollection])
+    case composite(AnyPTransform)
+}
+
+
+
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift
index fb5e2e52225..e45c3f7eacb 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift
@@ -20,7 +20,7 @@ import Logging
 
 actor WorkerProvider : Org_Apache_Beam_Model_FnExecution_V1_BeamFnExternalWorkerPoolAsyncProvider {
 
-    private let log = Logger(label:"WorkerProvider")
+    private let log = Logging.Logger(label: "Worker")
     private var workers: [String:Worker] = [:]
 
     private let functions: [String:SerializableFn]
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
new file mode 100644
index 00000000000..52f2b28aef1
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
@@ -0,0 +1,68 @@
+
+/// Creating Static Values
+public extension PCollection {
+
+    /// Each time the input fires output all of the values in this list.
+    func create<Value:Codable>(_ values: [Value],_ name:String = "\(#file):\(#line)") -> PCollection<Value> {
+        return pardo(name,values) { values,input,output in
+            for try await (_,ts,w) in input {
+                for v in values {
+                    output.emit(v,timestamp:ts,window:w)
+                }
+            }
+        }
+    }
+}
+
+/// Convenience logging mappers
+public extension PCollection {
+    func log(prefix:String,name:String = "\(#file):\(#line)") -> PCollection<Of> where Of == String {
+        pardo(name,prefix) { prefix,input,output in
+            for await element in input {
+                print("\(prefix): \(element)")
+                output.emit(element)
+            }
+        }
+    }
+}
+
+/// Modifying Values
+public extension PCollection {
+    
+    /// Modify a value without changing its window or timestamp
+    func map<Out>(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> {
+        return pardo(name) { input,output in
+            for try await (v,ts,w) in input {
+                output.emit(fn(v),timestamp:ts,window:w)
+            }
+        }
+    }
+    
+    func map<K,V>(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> {
+        return pardo(name) { input,output in
+            for try await (i,ts,w) in input {
+                let (key,value) = fn(i)
+                output.emit(KV(key,value),timestamp:ts,window:w)
+            }
+        }
+    }
+
+    /// Produce multiple outputs as a single value without modifying window or timestamp
+    func flatMap<Out>(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> [Out]) -> PCollection<Out> {
+        return pardo(name) { input,output in
+            for try await (v,ts,w) in input {
+                for i in fn(v) {
+                    output.emit(i,timestamp:ts,window:w)
+                }
+            }
+        }
+    }
+
+}
+
+public extension PCollection<Never> {
+    /// Convenience function to add an impulse when we are at the root of the pipeline
+    func create<Value:Codable>(_ values: [Value],_ name:String = "\(#file):\(#line)") -> PCollection<Value> {
+        return impulse().create(values,name)
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift
new file mode 100644
index 00000000000..265fdbd58f1
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift
@@ -0,0 +1,98 @@
+
+import Foundation
+
+public extension PCollection where Of == Never {
+    /// Impulse the most basic transform. It can only be attached to PCollections of type Never,
+    /// which is the root transform used by Pipelines.
+    func impulse() -> PCollection<Data> {
+        let output = PCollection<Data>()
+        self.apply(.impulse(AnyPCollection(output)))
+        return output
+    }
+}
+
+/// ParDo is the core user operator that pretty much everything else gets built on. We provide two versions here
+public extension PCollection {
+    // TODO: Replace with parameter pack version once https://github.com/apple/swift/issues/67192 is resolved
+    
+    // No Output
+    func pardo<F:SerializableFn>(_ name: String = "\(#file):\(#line)",_ fn: F) {
+        self.apply(.pardo(name, fn, []))
+    }
+    func pardo(_ name: String = "\(#file):\(#line)",_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) {
+        self.apply(.pardo(name, ClosureFn(fn),[]))
+    }
+    func pardo<Param:Codable>(_ name: String = "\(#file):\(#line)",_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) {
+        self.apply(.pardo(name, ParameterizedClosureFn(param,fn), []))
+    }
+
+
+    // Single Output
+    func pardo<F:SerializableFn,O0>(_ name: String = "\(#file):\(#line)",_ fn: F,
+                                    _ o0:PCollection<O0>) {
+        self.apply(.pardo(name, fn, [AnyPCollection(o0)]))
+    }
+    func pardo<O0>(_ name: String = "\(#file):\(#line)",
+                   _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) {
+        let output = PCollection<O0>()
+        self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output)]))
+        return output
+    }
+    func pardo<Param:Codable,O0>(_ name: String = "\(#file):\(#line)",_ param: Param,
+                   _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) {
+        let output = PCollection<O0>()
+        self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output)]))
+        return output
+    }
+
+    // Two Outputs
+    func pardo<F:SerializableFn,O0,O1>(_ name: String = "\(#file):\(#line)",_ fn: F,
+                                    _ o0:PCollection<O0>,_ o1:PCollection<O1>) {
+        self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1)]))
+    }
+    func pardo<O0,O1>(_ name: String = "\(#file):\(#line)",
+                      _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
+        let output = (PCollection<O0>(),PCollection<O1>())
+        self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        return output
+    }
+    func pardo<Param:Codable,O0,O1>(_ name: String = "\(#file):\(#line)",_ param: Param,
+                                    _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
+        let output = (PCollection<O0>(),PCollection<O1>())
+        self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        return output
+    }
+
+    // Three Outputs
+    func pardo<F:SerializableFn,O0,O1,O2>(_ name: String = "\(#file):\(#line)",_ fn: F,
+                                          _ o0:PCollection<O0>,_ o1:PCollection<O1>,_ o2:PCollection<O2>) {
+        self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)]))
+    }
+    func pardo<O0,O1,O2>(_ name: String = "\(#file):\(#line)",
+                         _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
+        let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
+        self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        return output
+    }
+    func pardo<Param:Codable,O0,O1,O2>(_ name: String = "\(#file):\(#line)",_ param: Param,
+                                       _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
+        let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
+        self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        return output
+    }
+
+    //TODO: Add more as needed
+}
+
+public extension PCollection {
+    /// Core GroupByKey transform. Requires a pair input
+    func groupByKey<K,V>() -> PCollection<KV<K,V>> where Of == KV<K,V> {
+        // Adjust the coder for the pcollection to reflect GBK semantcs
+        let output = PCollection<KV<K,V>>(coder:.keyvalue(.of(type: K.self)!, .of(type: Array<V>.self)!))
+        self.apply(.groupByKey(AnyPCollection(output)))
+        return output
+    }
+    
+    
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
new file mode 100644
index 00000000000..1b60f2a49cb
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
@@ -0,0 +1,22 @@
+/// Basic reducers
+public extension PCollection {
+    func reduce<Result:Codable,K,V>(name:String = "\(#file):\(#line)",into:Result,_ accumulator: @Sendable @escaping (V,inout Result) -> Void) -> PCollection<KV<K,Result>> where Of == KV<K,V> {
+        return pardo(name,into) { initialValue,input,output in
+            for await (kv,ts,w) in input {
+                var result = initialValue
+                for v in kv.values {
+                    accumulator(v,&result)
+                }
+                output.emit(KV(kv.key,result),timestamp:ts,window:w)
+            }
+                
+        }
+    }
+}
+
+/// Convenience functions
+public extension PCollection {
+    func sum<K,V:Numeric&Codable>() -> PCollection<KV<K,V>> where Of == KV<K,V> {
+        return reduce(into: 0,{ a,b in b = b + a })
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift
new file mode 100644
index 00000000000..e2025394471
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift
@@ -0,0 +1,8 @@
+/// Basic grouping functionality
+///
+public extension PCollection {
+    func groupBy<K,V>(name: String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> {
+        return map(name:name,fn)
+            .groupByKey()
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeamDocumentation/Documentation.docc/ApacheBeam.md b/sdks/swift/Sources/ApacheBeamDocumentation/Documentation.docc/ApacheBeam.md
new file mode 100644
index 00000000000..afbac1a07f1
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeamDocumentation/Documentation.docc/ApacheBeam.md
@@ -0,0 +1,37 @@
+# ``ApacheBeam``
+
+A Swift SDK implementation for Beam
+
+@Metadata {
+    @DisplayName("Apache Beam SDK for Swift")
+}
+
+## Overview
+
+The Apache Beam SDK for Swift allows Swift developers to create executables that can be submitted to all Beam Portable Runners including Flink and Dataflow. 
+
+To use the Apache Beam SDK for Swift, first add it as a dependency to an executable project:
+
+```swift
+let package = Package(
+    // name, platforms, products, etc.
+    dependencies: [
+        // other dependencies
+        .package(url: "https://github.com/apache/beam/sdks/swift", from: "2.51.0"),
+    ],
+    targets: [
+        // targets
+    ]
+)
+```
+
+> Note: Swift 5.9 or higher is required in order to use the Swift SDK
+
+## Topics
+
+### Getting Started
+
+
+
+
+
diff --git a/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift b/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift
index 74031c8b545..acd75219f50 100644
--- a/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift
+++ b/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift
@@ -22,6 +22,15 @@ import XCTest
 @testable import ApacheBeam
 
 final class CoderTests: XCTestCase {
+    
+    func testSimpleScalarConversions() throws {
+        XCTAssertTrue(Coder.of(type: Data.self) == .bytes)
+        XCTAssertTrue(Coder.of(type: String.self) == .string)
+        XCTAssertTrue(Coder.of(type: Bool.self) == .boolean)
+        XCTAssertTrue(Coder.of(type: Int.self) == .varint)
+    }
+    
+    
     func testDefaultImpulseDecode() throws {
         var impulse = Data([0x7f,0xdf,0x3b,0x64,0x5a,0x1c,0xac,0x09,0x00,0x00,0x00,0x01,0x0f,0x00])
         let impulseCoder = Coder.windowedvalue(.bytes, .globalwindow)
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
new file mode 100644
index 00000000000..3e2d6a3be7d
--- /dev/null
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
@@ -0,0 +1,76 @@
+//
+//  IntegrationTests.swift
+//  
+//
+//  Created by Byron Ellis on 8/11/23.
+//
+
+import XCTest
+import ApacheBeam
+
+func fixtureData(_ fixture: String) throws -> Data {
+    try Data(contentsOf: fixtureUrl(fixture))
+}
+
+
+func fixtureUrl(_ fixture: String) -> URL {
+    fixturesDirectory().appendingPathComponent(fixture)
+}
+
+
+func fixturesDirectory(path: String = #file) -> URL {
+    let url = URL(fileURLWithPath: path)
+    let testsDir = url.deletingLastPathComponent()
+    let res = testsDir.appendingPathComponent("Fixtures")
+    return res
+}
+
+final class IntegrationTests: XCTestCase {
+
+    override func setUpWithError() throws {
+    }
+
+    override func tearDownWithError() throws {
+    }
+
+    func testPortableWordcount() throws {
+        _ = Pipeline { pipeline in
+            let (contents,errors) = pipeline
+                .create(["file1.txt","file2.txt","missing.txt"])
+                .pardo { filenames,output,errors in
+                    for await (filename,_,_) in filenames {
+                        do {
+                            output.emit(String(decoding:try fixtureData(filename),as:UTF8.self))
+                        } catch {
+                            errors.emit("Unable to read \(filename): \(error)")
+                        }
+                    }
+                }
+            
+            // Simple ParDo that takes advantage of enumerateLines
+            let lines = contents.pardo { contents,lines in
+                for await (content,_,_) in contents {
+                    content.enumerateLines { line,_ in
+                        lines.emit(line)
+                    }
+                }
+            }
+            
+            // Our first group by operation
+            let baseCount = lines
+                .flatMap({ $0.components(separatedBy: .whitespaces) })
+                .groupBy({ ($0,1) })
+                .sum()
+            
+            let normalizedCounts = baseCount.groupBy {
+                ($0.key.lowercased().trimmingCharacters(in: .punctuationCharacters),
+                 $0.value ?? 1)
+            }.sum()
+            
+            
+            
+        }
+    }
+
+
+}