You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/05/19 16:46:16 UTC

[beam] branch master updated: Update the PTransform and associated APIs to be less class-based. (#17699)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e5ca0c0fa4 Update the PTransform and associated APIs to be less class-based. (#17699)
6e5ca0c0fa4 is described below

commit 6e5ca0c0fa450a262fce5fb35dd7c66949e81401
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Thu May 19 10:46:10 2022 -0600

    Update the PTransform and associated APIs to be less class-based. (#17699)
---
 sdks/typescript/README.md                          |   6 +
 sdks/typescript/src/apache_beam/coders/coders.ts   |  28 ++-
 .../src/apache_beam/examples/wordcount.ts          |  19 +-
 .../src/apache_beam/examples/wordcount_sql.ts      |  10 +-
 .../src/apache_beam/examples/wordcount_textio.ts   |  19 +-
 .../src/apache_beam/internal/pipeline.ts           |  36 ++-
 sdks/typescript/src/apache_beam/internal/urns.ts   |   1 -
 sdks/typescript/src/apache_beam/io/textio.ts       |  22 +-
 sdks/typescript/src/apache_beam/pvalue.ts          | 104 +++++----
 .../src/apache_beam/runners/direct_runner.ts       |  12 +-
 sdks/typescript/src/apache_beam/testing/assert.ts  |  53 ++---
 .../src/apache_beam/transforms/create.ts           |  29 +--
 .../src/apache_beam/transforms/external.ts         |  23 +-
 .../src/apache_beam/transforms/flatten.ts          |  26 ++-
 .../apache_beam/transforms/group_and_combine.ts    | 161 +++++++-------
 .../typescript/src/apache_beam/transforms/index.ts |   2 +-
 .../src/apache_beam/transforms/internal.ts         | 150 +++++++------
 .../typescript/src/apache_beam/transforms/pardo.ts | 245 +++++++++------------
 sdks/typescript/src/apache_beam/transforms/sql.ts  |  35 ++-
 .../src/apache_beam/transforms/transform.ts        |  30 ++-
 .../src/apache_beam/transforms/window.ts           | 186 ++++++++--------
 .../src/apache_beam/transforms/windowings.ts       | 135 +++++-------
 .../typescript/src/apache_beam/worker/operators.ts |  78 +++----
 sdks/typescript/test/combine_test.ts               |  56 +++--
 sdks/typescript/test/primitives_test.ts            |  88 ++++----
 sdks/typescript/test/standard_coders_test.ts       |  24 +-
 sdks/typescript/test/wordcount.ts                  |  28 +--
 27 files changed, 745 insertions(+), 861 deletions(-)

diff --git a/sdks/typescript/README.md b/sdks/typescript/README.md
index 7971bf1d563..745170bba65 100644
--- a/sdks/typescript/README.md
+++ b/sdks/typescript/README.md
@@ -93,6 +93,12 @@ used in Python. These can be "ordinary" javascript objects (which are passed
 as is) or special DoFnParam objects which provide getters to element-specific
 information (such as the current timestamp, window, or side input) at runtime.
 
+* Rather than introduce multiple-output complexity into the map/do operations
+themselves, producing multiple outputs is done by following with a new
+`Split` primitive that takes a
+`PCollection<{a?: AType, b: BType, ... }>` and produces an object
+`{a: PCollection<AType>, b: PCollection<BType>, ...}`.
+
 * Javascript supports (and encourages) an asynchronous programing model, with
 many libraries requiring use of the async/await paradigm.
 As there is no way (by design) to go from the asyncronous style back to
diff --git a/sdks/typescript/src/apache_beam/coders/coders.ts b/sdks/typescript/src/apache_beam/coders/coders.ts
index ecb276a4bb6..3a9547fff34 100644
--- a/sdks/typescript/src/apache_beam/coders/coders.ts
+++ b/sdks/typescript/src/apache_beam/coders/coders.ts
@@ -38,16 +38,36 @@ interface Class<T> {
  */
 class CoderRegistry {
   internal_registry = {};
-  get(urn: string): Class<Coder<unknown>> {
-    const constructor: Class<Coder<unknown>> = this.internal_registry[urn];
+
+  getCoder(
+    urn: string,
+    payload: Uint8Array | undefined = undefined,
+    ...components: Coder<unknown>[]
+  ) {
+    const constructor: (...args) => Coder<unknown> =
+      this.internal_registry[urn];
     if (constructor === undefined) {
       throw new Error("Could not find coder for URN " + urn);
     }
-    return constructor;
+    if (payload && payload.length > 0) {
+      return constructor(payload, ...components);
+    } else {
+      return constructor(...components);
+    }
   }
 
+  // TODO: Figure out how to branch on constructors (called with new) and
+  // ordinary functions.
   register(urn: string, coderClass: Class<Coder<any>>) {
-    this.internal_registry[urn] = coderClass;
+    this.registerClass(urn, coderClass);
+  }
+
+  registerClass(urn: string, coderClass: Class<Coder<any>>) {
+    this.registerConstructor(urn, (...args) => new coderClass(...args));
+  }
+
+  registerConstructor(urn: string, constructor: (...args) => Coder<any>) {
+    this.internal_registry[urn] = constructor;
   }
 }
 
diff --git a/sdks/typescript/src/apache_beam/examples/wordcount.ts b/sdks/typescript/src/apache_beam/examples/wordcount.ts
index 961afb43e9b..e34b8cab3c5 100644
--- a/sdks/typescript/src/apache_beam/examples/wordcount.ts
+++ b/sdks/typescript/src/apache_beam/examples/wordcount.ts
@@ -34,20 +34,7 @@ import * as yargs from "yargs";
 
 import * as beam from "../../apache_beam";
 import { createRunner } from "../runners/runner";
-
-import { count } from "../transforms/combiners";
-import { GroupBy } from "../transforms/group_and_combine";
-
-class CountElements extends beam.PTransform<
-  beam.PCollection<any>,
-  beam.PCollection<any>
-> {
-  expand(input: beam.PCollection<any>) {
-    return input
-      .map((e) => ({ element: e }))
-      .apply(new GroupBy("element").combining("element", count, "count"));
-  }
-}
+import { countPerElement } from "../transforms/group_and_combine";
 
 function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
   return lines
@@ -55,13 +42,13 @@ function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
     .flatMap(function* (line: string) {
       yield* line.split(/[^a-z]+/);
     })
-    .apply(new CountElements("Count"));
+    .apply(countPerElement());
 }
 
 async function main() {
   await createRunner(yargs.argv).run((root) => {
     const lines = root.apply(
-      new beam.Create([
+      beam.create([
         "In the beginning God created the heaven and the earth.",
         "And the earth was without form, and void; and darkness was upon the face of the deep.",
         "And the Spirit of God moved upon the face of the waters.",
diff --git a/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts b/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts
index f27a0899710..c2453d64b93 100644
--- a/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts
+++ b/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts
@@ -24,9 +24,7 @@ import * as textio from "../io/textio";
 
 import { PortableRunner } from "../runners/portable_runner/runner";
 
-import * as internal from "../../apache_beam/transforms/internal";
-import { RowCoder } from "../coders/row_coder";
-import { SqlTransform } from "../transforms/sql";
+import { sqlTransform } from "../transforms/sql";
 
 async function main() {
   // python apache_beam/runners/portability/local_job_service_main.py --port 3333
@@ -34,13 +32,13 @@ async function main() {
     environmentType: "LOOPBACK",
     jobEndpoint: "localhost:3333",
   }).run(async (root) => {
-    const lines = root.apply(new beam.Create(["a", "b", "c", "c"]));
+    const lines = root.apply(beam.create(["a", "b", "c", "c"]));
 
     const filtered = await lines
       .map((w) => ({ word: w }))
-      .apply(new internal.WithCoderInternal(RowCoder.fromJSON({ word: "str" })))
+      .apply(beam.withRowCoder({ word: "str" }))
       .asyncApply(
-        new SqlTransform(
+        sqlTransform(
           "SELECT word, count(*) as c from PCOLLECTION group by word"
         )
       );
diff --git a/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts b/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts
index 03f7149e285..686be3b0594 100644
--- a/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts
+++ b/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts
@@ -21,22 +21,9 @@
 import * as beam from "../../apache_beam";
 import * as textio from "../io/textio";
 import { DirectRunner } from "../runners/direct_runner";
-
-import { count } from "../transforms/combiners";
-import { GroupBy } from "../transforms/group_and_combine";
-
 import { PortableRunner } from "../runners/portable_runner/runner";
 
-class CountElements extends beam.PTransform<
-  beam.PCollection<any>,
-  beam.PCollection<any>
-> {
-  expand(input: beam.PCollection<any>) {
-    return input
-      .map((e) => ({ element: e }))
-      .apply(new GroupBy("element").combining("element", count, "count"));
-  }
-}
+import { countPerElement } from "../transforms/group_and_combine";
 
 function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
   return lines
@@ -44,14 +31,14 @@ function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
     .flatMap(function* (line: string) {
       yield* line.split(/[^a-z]+/);
     })
-    .apply(new CountElements("Count"));
+    .apply(countPerElement());
 }
 
 async function main() {
   // python apache_beam/runners/portability/local_job_service_main.py --port 3333
   await new PortableRunner("localhost:3333").run(async (root) => {
     const lines = await root.asyncApply(
-      new textio.ReadFromText("gs://dataflow-samples/shakespeare/kinglear.txt")
+      textio.readFromText("gs://dataflow-samples/shakespeare/kinglear.txt")
     );
 
     lines.apply(wordCount).map(console.log);
diff --git a/sdks/typescript/src/apache_beam/internal/pipeline.ts b/sdks/typescript/src/apache_beam/internal/pipeline.ts
index 4b9f6a7b52a..f333bb6c99b 100644
--- a/sdks/typescript/src/apache_beam/internal/pipeline.ts
+++ b/sdks/typescript/src/apache_beam/internal/pipeline.ts
@@ -21,13 +21,13 @@ import equal from "fast-deep-equal";
 import * as runnerApi from "../proto/beam_runner_api";
 import * as fnApi from "../proto/beam_fn_api";
 import {
-  PTransform,
-  AsyncPTransform,
+  PTransformClass,
+  AsyncPTransformClass,
   extractName,
 } from "../transforms/transform";
-import { GlobalWindows } from "../transforms/windowings";
+import { globalWindows } from "../transforms/windowings";
 import * as pvalue from "../pvalue";
-import { WindowInto } from "../transforms/window";
+import { createWindowingStrategyProto } from "../transforms/window";
 import * as environments from "./environments";
 import { Coder, globalRegistry as globalCoderRegistry } from "../coders/coders";
 
@@ -45,18 +45,14 @@ export class PipelineContext {
     const this_ = this;
     if (this.coders[coderId] == undefined) {
       const coderProto = this.components.coders[coderId];
-      const coderConstructor = globalCoderRegistry().get(coderProto.spec!.urn);
-      const components = (coderProto.componentCoderIds || []).map(
-        this_.getCoder.bind(this_)
+      const components: Coder<unknown>[] = (
+        coderProto.componentCoderIds || []
+      ).map(this_.getCoder.bind(this_));
+      this.coders[coderId] = globalCoderRegistry().getCoder(
+        coderProto.spec!.urn,
+        coderProto.spec!.payload,
+        ...components
       );
-      if (coderProto.spec!.payload?.length) {
-        this.coders[coderId] = new coderConstructor(
-          coderProto.spec!.payload,
-          ...components
-        );
-      } else {
-        this.coders[coderId] = new coderConstructor(...components);
-      }
     }
     return this.coders[coderId];
   }
@@ -132,13 +128,13 @@ export class Pipeline {
       environments.defaultJsEnvironment();
     this.context = new PipelineContext(this.proto.components!);
     this.proto.components!.windowingStrategies[this.globalWindowing] =
-      WindowInto.createWindowingStrategy(this, new GlobalWindows());
+      createWindowingStrategyProto(this, globalWindows());
   }
 
   preApplyTransform<
     InputT extends pvalue.PValue<any>,
     OutputT extends pvalue.PValue<any>
-  >(transform: AsyncPTransform<InputT, OutputT>, input: InputT) {
+  >(transform: AsyncPTransformClass<InputT, OutputT>, input: InputT) {
     const this_ = this;
     const transformId = this.context.createUniqueName("transform");
     let parent: runnerApi.PTransform | undefined = undefined;
@@ -168,7 +164,7 @@ export class Pipeline {
   applyTransform<
     InputT extends pvalue.PValue<any>,
     OutputT extends pvalue.PValue<any>
-  >(transform: PTransform<InputT, OutputT>, input: InputT) {
+  >(transform: PTransformClass<InputT, OutputT>, input: InputT) {
     const { id: transformId, proto: transformProto } = this.preApplyTransform(
       transform,
       input
@@ -186,7 +182,7 @@ export class Pipeline {
   async asyncApplyTransform<
     InputT extends pvalue.PValue<any>,
     OutputT extends pvalue.PValue<any>
-  >(transform: AsyncPTransform<InputT, OutputT>, input: InputT) {
+  >(transform: AsyncPTransformClass<InputT, OutputT>, input: InputT) {
     const { id: transformId, proto: transformProto } = this.preApplyTransform(
       transform,
       input
@@ -205,7 +201,7 @@ export class Pipeline {
     InputT extends pvalue.PValue<any>,
     OutputT extends pvalue.PValue<any>
   >(
-    transform: AsyncPTransform<InputT, OutputT>,
+    transform: AsyncPTransformClass<InputT, OutputT>,
     transformProto: runnerApi.PTransform,
     result: OutputT
   ) {
diff --git a/sdks/typescript/src/apache_beam/internal/urns.ts b/sdks/typescript/src/apache_beam/internal/urns.ts
index 4121444a308..e08ac5803bf 100644
--- a/sdks/typescript/src/apache_beam/internal/urns.ts
+++ b/sdks/typescript/src/apache_beam/internal/urns.ts
@@ -24,7 +24,6 @@ export const IDENTITY_DOFN_URN = "beam:dofn:identity:0.1";
 
 export const SERIALIZED_JS_DOFN_INFO = "beam:dofn:serialized_js_dofn_info:v1";
 export const SPLITTING_JS_DOFN_URN = "beam:dofn:splitting_dofn:v1";
-export const SPLITTING2_JS_DOFN_URN = "beam:dofn:splitting2_dofn:v1";
 export const JS_WINDOW_INTO_DOFN_URN = "beam:dofn:js_window_into:v1";
 export const JS_ASSIGN_TIMESTAMPS_DOFN_URN =
   "beam:dofn:js_assign_timestamps:v1";
diff --git a/sdks/typescript/src/apache_beam/io/textio.ts b/sdks/typescript/src/apache_beam/io/textio.ts
index adf487d3f22..58cc5f241b4 100644
--- a/sdks/typescript/src/apache_beam/io/textio.ts
+++ b/sdks/typescript/src/apache_beam/io/textio.ts
@@ -19,28 +19,20 @@
 import * as beam from "../../apache_beam";
 import * as external from "../transforms/external";
 
-export class ReadFromText extends beam.AsyncPTransform<
-  beam.Root,
-  beam.PCollection<string>
-> {
-  constructor(private filePattern: string) {
-    super();
-  }
-
-  async asyncExpand(root: beam.Root) {
+export function readFromText(
+  filePattern: string
+): beam.AsyncPTransform<beam.Root, beam.PCollection<string>> {
+  return async function readFromText(root: beam.Root) {
     return await root.asyncApply(
-      new external.RawExternalTransform<
-        beam.PValue<any>,
-        beam.PCollection<any>
-      >(
+      external.rawExternalTransform<beam.Root, beam.PCollection<any>>(
         "beam:transforms:python:fully_qualified_named",
         {
           constructor: "apache_beam.io.ReadFromText",
-          kwargs: { file_pattern: this.filePattern },
+          kwargs: { file_pattern: filePattern },
         },
         // python apache_beam/runners/portability/expansion_service_main.py --fully_qualified_name_glob='*' --port 4444 --environment_type='beam:env:embedded_python:v1'
         "localhost:4444"
       )
     );
-  }
+  };
 }
diff --git a/sdks/typescript/src/apache_beam/pvalue.ts b/sdks/typescript/src/apache_beam/pvalue.ts
index bf4dd4ea9fd..1313f2241e7 100644
--- a/sdks/typescript/src/apache_beam/pvalue.ts
+++ b/sdks/typescript/src/apache_beam/pvalue.ts
@@ -21,10 +21,12 @@ import { Pipeline } from "./internal/pipeline";
 import {
   PTransform,
   AsyncPTransform,
+  PTransformClass,
+  AsyncPTransformClass,
   extractName,
   withName,
 } from "./transforms/transform";
-import { ParDo, DoFn } from "./transforms/pardo";
+import { parDo, DoFn } from "./transforms/pardo";
 import * as runnerApi from "./proto/beam_runner_api";
 
 /**
@@ -38,20 +40,18 @@ export class Root {
     this.pipeline = pipeline;
   }
 
-  apply<OutputT extends PValue<any>>(
-    transform: PTransform<Root, OutputT> | ((Root) => OutputT)
-  ) {
-    if (!(transform instanceof PTransform)) {
-      transform = new PTransformFromCallable(transform);
+  apply<OutputT extends PValue<any>>(transform: PTransform<Root, OutputT>) {
+    if (!(transform instanceof PTransformClass)) {
+      transform = new PTransformClassFromCallable(transform);
     }
     return this.pipeline.applyTransform(transform, this);
   }
 
   async asyncApply<OutputT extends PValue<any>>(
-    transform: AsyncPTransform<Root, OutputT> | ((Root) => Promise<OutputT>)
+    transform: AsyncPTransform<Root, OutputT>
   ) {
-    if (!(transform instanceof AsyncPTransform)) {
-      transform = new AsyncPTransformFromCallable(transform);
+    if (!(transform instanceof AsyncPTransformClass)) {
+      transform = new AsyncPTransformClassFromCallable(transform);
     }
     return await this.pipeline.asyncApplyTransform(transform, this);
   }
@@ -83,21 +83,19 @@ export class PCollection<T> {
   }
 
   apply<OutputT extends PValue<any>>(
-    transform: PTransform<PCollection<T>, OutputT> | ((PCollection) => OutputT)
+    transform: PTransform<PCollection<T>, OutputT>
   ) {
-    if (!(transform instanceof PTransform)) {
-      transform = new PTransformFromCallable(transform);
+    if (!(transform instanceof PTransformClass)) {
+      transform = new PTransformClassFromCallable(transform);
     }
     return this.pipeline.applyTransform(transform, this);
   }
 
   asyncApply<OutputT extends PValue<any>>(
-    transform:
-      | AsyncPTransform<PCollection<T>, OutputT>
-      | ((PCollection) => Promise<OutputT>)
+    transform: AsyncPTransform<PCollection<T>, OutputT>
   ) {
-    if (!(transform instanceof AsyncPTransform)) {
-      transform = new AsyncPTransformFromCallable(transform);
+    if (!(transform instanceof AsyncPTransformClass)) {
+      transform = new AsyncPTransformClassFromCallable(transform);
     }
     return this.pipeline.asyncApplyTransform(transform, this);
   }
@@ -111,7 +109,7 @@ export class PCollection<T> {
     return this.apply(
       withName(
         "map(" + extractName(fn) + ")",
-        new ParDo<T, OutputT, ContextT>(
+        parDo<T, OutputT, ContextT>(
           {
             process: function* (element: T, context: ContextT) {
               // While it's legal to call a function with extra arguments which will
@@ -136,7 +134,7 @@ export class PCollection<T> {
     return this.apply(
       withName(
         "flatMap(" + extractName(fn) + ")",
-        new ParDo<T, OutputT, ContextT>(
+        parDo<T, OutputT, ContextT>(
           {
             process: function (element: T, context: ContextT) {
               // While it's legal to call a function with extra arguments which will
@@ -158,7 +156,7 @@ export class PCollection<T> {
 }
 
 /**
- * The type of object that may be consumed or produced by a PTransform.
+ * The type of object that may be consumed or produced by a PTransformClass.
  */
 export type PValue<T> =
   | void
@@ -209,7 +207,7 @@ export function flattenPValue<T>(
  * Wraps a PValue in a single object such that a transform can be applied to it.
  *
  * For example, Flatten takes a PCollection[] as input, but Array has no
- * apply(PTransform) method, so one writes
+ * apply(PTransformClass) method, so one writes
  *
  *    P([pcA, pcB, pcC]).apply(new Flatten())
  */
@@ -221,21 +219,21 @@ class PValueWrapper<T extends PValue<any>> {
   constructor(private pvalue: T) {}
 
   apply<O extends PValue<any>>(
-    transform: PTransform<T, O> | ((input: T) => O),
+    transform: PTransform<T, O>,
     root: Root | null = null
   ) {
-    if (!(transform instanceof PTransform)) {
-      transform = new PTransformFromCallable(transform);
+    if (!(transform instanceof PTransformClass)) {
+      transform = new PTransformClassFromCallable(transform);
     }
     return this.pipeline(root).applyTransform(transform, this.pvalue);
   }
 
   async asyncApply<O extends PValue<any>>(
-    transform: AsyncPTransform<T, O> | ((input: T) => Promise<O>),
+    transform: AsyncPTransform<T, O>,
     root: Root | null = null
   ) {
-    if (!(transform instanceof AsyncPTransform)) {
-      transform = new AsyncPTransformFromCallable(transform);
+    if (!(transform instanceof AsyncPTransformClass)) {
+      transform = new AsyncPTransformClassFromCallable(transform);
     }
     return await this.pipeline(root).asyncApplyTransform(
       transform,
@@ -253,35 +251,63 @@ class PValueWrapper<T extends PValue<any>> {
   }
 }
 
-class PTransformFromCallable<
+class PTransformClassFromCallable<
   InputT extends PValue<any>,
   OutputT extends PValue<any>
-> extends PTransform<InputT, OutputT> {
-  expander: (InputT) => OutputT;
+> extends PTransformClass<InputT, OutputT> {
+  expander: (
+    input: InputT,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) => OutputT;
 
-  constructor(expander: (InputT) => OutputT) {
+  constructor(
+    expander: (
+      input: InputT,
+      pipeline: Pipeline,
+      transformProto: runnerApi.PTransform
+    ) => OutputT
+  ) {
     super(extractName(expander));
     this.expander = expander;
   }
 
-  expand(input: InputT) {
-    return this.expander(input);
+  expandInternal(
+    input: InputT,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    return this.expander(input, pipeline, transformProto);
   }
 }
 
-class AsyncPTransformFromCallable<
+class AsyncPTransformClassFromCallable<
   InputT extends PValue<any>,
   OutputT extends PValue<any>
-> extends AsyncPTransform<InputT, OutputT> {
-  expander: (InputT) => Promise<OutputT>;
+> extends AsyncPTransformClass<InputT, OutputT> {
+  expander: (
+    input: InputT,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) => Promise<OutputT>;
 
-  constructor(expander: (InputT) => Promise<OutputT>) {
+  constructor(
+    expander: (
+      input: InputT,
+      pipeline: Pipeline,
+      transformProto: runnerApi.PTransform
+    ) => Promise<OutputT>
+  ) {
     super(extractName(expander));
     this.expander = expander;
   }
 
-  async asyncExpand(input: InputT) {
-    return this.expander(input);
+  async asyncExpandInternal(
+    input: InputT,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    return this.expander(input, pipeline, transformProto);
   }
 }
 
diff --git a/sdks/typescript/src/apache_beam/runners/direct_runner.ts b/sdks/typescript/src/apache_beam/runners/direct_runner.ts
index ff203d06c41..1482995eee5 100644
--- a/sdks/typescript/src/apache_beam/runners/direct_runner.ts
+++ b/sdks/typescript/src/apache_beam/runners/direct_runner.ts
@@ -28,13 +28,13 @@ import { JobState_Enum } from "../proto/beam_job_api";
 
 import { Pipeline } from "../internal/pipeline";
 import { Root } from "../pvalue";
-import { Impulse, GroupByKey } from "../transforms/internal";
+import { impulse, groupByKey } from "../transforms/internal";
 import { Runner, PipelineResult } from "./runner";
 import * as worker from "../worker/worker";
 import * as operators from "../worker/operators";
 import { createStateKey } from "../worker/pardo_context";
 import * as state from "../worker/state";
-import { ParDo } from "../transforms/pardo";
+import { parDo } from "../transforms/pardo";
 import {
   Window,
   GlobalWindow,
@@ -121,7 +121,7 @@ export class DirectRunner extends Runner {
         descriptor,
         null!,
         new state.CachingStateProvider(stateProvider),
-        [Impulse.urn]
+        [impulse.urn]
       );
       await processor.process("bundle_id");
 
@@ -165,7 +165,7 @@ class DirectImpulseOperator implements operators.IOperator {
   async finishBundle() {}
 }
 
-operators.registerOperator(Impulse.urn, DirectImpulseOperator);
+operators.registerOperator(impulse.urn, DirectImpulseOperator);
 
 // Only to be used in direct runner, as this will only group within a single bundle.
 // TODO: (Extension) This could be used as a base for the PGBKOperation operator,
@@ -248,7 +248,7 @@ class DirectGbkOperator implements operators.IOperator {
   }
 }
 
-operators.registerOperator(GroupByKey.urn, DirectGbkOperator);
+operators.registerOperator(groupByKey.urn, DirectGbkOperator);
 
 /**
  * Rewrites the pipeline to be suitable for running as a single "bundle."
@@ -305,7 +305,7 @@ function rewriteSideInputs(p: runnerApi.Pipeline, pipelineStateRef: string) {
   for (const [transformId, transform] of Object.entries(transforms)) {
     if (
       transform.spec != undefined &&
-      transform.spec.urn == ParDo.urn &&
+      transform.spec.urn == parDo.urn &&
       Object.keys(transform.inputs).length > 1
     ) {
       const spec = runnerApi.ParDoPayload.fromBinary(transform.spec!.payload);
diff --git a/sdks/typescript/src/apache_beam/testing/assert.ts b/sdks/typescript/src/apache_beam/testing/assert.ts
index 0d6451c5b45..930ffca6931 100644
--- a/sdks/typescript/src/apache_beam/testing/assert.ts
+++ b/sdks/typescript/src/apache_beam/testing/assert.ts
@@ -17,7 +17,7 @@
  */
 
 import * as beam from "../../apache_beam";
-import { GlobalWindows } from "../../apache_beam/transforms/windowings";
+import { globalWindows } from "../../apache_beam/transforms/windowings";
 import * as internal from "../transforms/internal";
 
 import * as assert from "assert";
@@ -28,55 +28,39 @@ function callAssertDeepEqual(a, b) {
 }
 
 // TODO: (Naming)
-export class AssertDeepEqual extends beam.PTransform<
-  beam.PCollection<any>,
-  void
-> {
-  expected: any[];
-
-  constructor(expected: any[]) {
-    super("AssertDeepEqual");
-    this.expected = expected;
-  }
-
-  expand(pcoll: beam.PCollection<any>) {
-    const expected = this.expected;
+export function assertDeepEqual<T>(
+  expected: T[]
+): beam.PTransform<beam.PCollection<T>, void> {
+  return function assertDeepEqual(pcoll: beam.PCollection<T>) {
     pcoll.apply(
-      new Assert("Assert", (actual) => {
-        const actualArray: any[] = [...actual];
+      assertContentsSatisfies((actual: T[]) => {
+        const actualArray: T[] = [...actual];
         expected.sort();
         actualArray.sort();
         callAssertDeepEqual(actualArray, expected);
       })
     );
-  }
+  };
 }
 
-export class Assert extends beam.PTransform<beam.PCollection<any>, void> {
-  check: (actual: any[]) => void;
-
-  constructor(name: string, check: (actual: any[]) => void) {
-    super(name);
-    this.check = check;
-  }
-
-  expand(pcoll: beam.PCollection<any>) {
-    const check = this.check;
+export function assertContentsSatisfies<T>(
+  check: (actual: T[]) => void
+): beam.PTransform<beam.PCollection<T>, void> {
+  function expand(pcoll: beam.PCollection<T>) {
     // We provide some value here to ensure there is at least one element
     // so the DoFn gets invoked.
     const singleton = pcoll
       .root()
-      .apply(new beam.Impulse())
+      .apply(beam.impulse())
       .map((_) => ({ tag: "expected" }));
     // CoGBK.
     const tagged = pcoll
       .map((e) => ({ tag: "actual", value: e }))
-      .apply(new beam.WindowInto(new GlobalWindows()));
+      .apply(beam.windowInto(globalWindows()));
     beam
       .P([singleton, tagged])
-      .apply(new beam.Flatten())
-      .map((e) => ({ key: 0, value: e }))
-      .apply(new internal.GroupByKey()) // TODO: GroupBy.
+      .apply(beam.flatten())
+      .apply(beam.groupBy((e) => 0))
       .map(
         beam.withName("extractActual", (kv) => {
           const actual: any[] =
@@ -86,6 +70,11 @@ export class Assert extends beam.PTransform<beam.PCollection<any>, void> {
         })
       );
   }
+
+  return beam.withName(
+    `assertContentsSatisfies(${beam.extractName(check)})`,
+    expand
+  );
 }
 
 import { requireForSerialization } from "../serialization";
diff --git a/sdks/typescript/src/apache_beam/transforms/create.ts b/sdks/typescript/src/apache_beam/transforms/create.ts
index e3413d3efb3..bf5eefa673a 100644
--- a/sdks/typescript/src/apache_beam/transforms/create.ts
+++ b/sdks/typescript/src/apache_beam/transforms/create.ts
@@ -17,34 +17,19 @@
  */
 
 import { PTransform, withName } from "./transform";
-import { Impulse } from "./internal";
+import { impulse } from "./internal";
 import { Root, PCollection } from "../pvalue";
 
 /**
  * A Ptransform that represents a 'static' source with a list of elements passed at construction time. It
  * returns a PCollection that contains the elements in the input list.
- *
- * @extends PTransform
  */
-export class Create<T> extends PTransform<Root, PCollection<T>> {
-  elements: T[];
-
-  /**
-   * Construct a new Create PTransform.
-   * @param elements - the list of elements in the PCollection
-   */
-  constructor(elements: T[]) {
-    super("Create");
-    this.elements = elements;
+export function create<T>(elements: T[]): PTransform<Root, PCollection<T>> {
+  function create(root: Root): PCollection<T> {
+    return root
+      .apply(impulse())
+      .flatMap(withName("ExtractElements", (_) => elements));
   }
 
-  expand(root: Root) {
-    const elements = this.elements;
-    // TODO: (Cleanup) Store encoded values and conditionally shuffle.
-    return root.apply(new Impulse()).flatMap(
-      withName("ExtractElements", function* blarg(_) {
-        yield* elements;
-      })
-    );
-  }
+  return create;
 }
diff --git a/sdks/typescript/src/apache_beam/transforms/external.ts b/sdks/typescript/src/apache_beam/transforms/external.ts
index 73ba81faca7..bfb3ebe4182 100644
--- a/sdks/typescript/src/apache_beam/transforms/external.ts
+++ b/sdks/typescript/src/apache_beam/transforms/external.ts
@@ -49,10 +49,27 @@ import * as service from "../utils/service";
 // TODO: (API) (Types) This class expects PCollections to already have the
 // correct Coders. It would be great if we could infer coders, or at least have
 // a cleaner way to specify them than using internal.WithCoderInternal.
-export class RawExternalTransform<
+export function rawExternalTransform<
   InputT extends PValue<any>,
   OutputT extends PValue<any>
-> extends transform.AsyncPTransform<InputT, OutputT> {
+>(
+  urn: string,
+  payload: Uint8Array | { [key: string]: any },
+  serviceProviderOrAddress: string | (() => Promise<service.Service>),
+  inferPValueType: boolean = true
+): transform.AsyncPTransform<InputT, OutputT> {
+  return new RawExternalTransform(
+    urn,
+    payload,
+    serviceProviderOrAddress,
+    inferPValueType
+  );
+}
+
+class RawExternalTransform<
+  InputT extends PValue<any>,
+  OutputT extends PValue<any>
+> extends transform.AsyncPTransformClass<InputT, OutputT> {
   static namespaceCounter = 0;
   static freshNamespace() {
     return "namespace_" + RawExternalTransform.namespaceCounter++ + "_";
@@ -110,7 +127,7 @@ export class RawExternalTransform<
       request.components!.transforms[fakeImpulseNamespace + pcId] =
         runnerApi.PTransform.create({
           uniqueName: fakeImpulseNamespace + "_create_" + pcId,
-          spec: { urn: internal.Impulse.urn, payload: new Uint8Array() },
+          spec: { urn: internal.impulse.urn, payload: new Uint8Array() },
           outputs: { main: pcId },
         });
     }
diff --git a/sdks/typescript/src/apache_beam/transforms/flatten.ts b/sdks/typescript/src/apache_beam/transforms/flatten.ts
index 80e80a98d19..57d29c236a5 100644
--- a/sdks/typescript/src/apache_beam/transforms/flatten.ts
+++ b/sdks/typescript/src/apache_beam/transforms/flatten.ts
@@ -17,28 +17,32 @@
  */
 
 import * as runnerApi from "../proto/beam_runner_api";
-import { PTransform } from "./transform";
+import { PTransform, withName } from "./transform";
 import { PCollection } from "../pvalue";
 import { Pipeline } from "../internal/pipeline";
 import { GeneralObjectCoder } from "../coders/js_coders";
 
-export class Flatten<T> extends PTransform<PCollection<T>[], PCollection<T>> {
-  // static urn: string = runnerApi.StandardPTransforms_Primitives.GROUP_BY_KEY.urn;
-  // TODO: (Cleanup) use above line, not below line.
-  static urn: string = "beam:transform:flatten:v1";
-  name = "Flatten";
-
-  expandInternal(
+export function flatten<T>(): PTransform<PCollection<T>[], PCollection<T>> {
+  function expandInternal(
     inputs: PCollection<T>[],
     pipeline: Pipeline,
     transformProto: runnerApi.PTransform
   ) {
     transformProto.spec = runnerApi.FunctionSpec.create({
-      urn: Flatten.urn,
+      urn: flatten.urn,
       payload: null!,
     });
 
-    // TODO: Input coder if they're all the same? UnionCoder?
-    return pipeline.createPCollectionInternal<T>(new GeneralObjectCoder());
+    // TODO: UnionCoder if they're not the same?
+    const coders = new Set(
+      inputs.map((pc) => pipeline.context.getPCollectionCoderId(pc))
+    );
+    const coder =
+      coders.size == 1 ? [...coders][0] : new GeneralObjectCoder<T>();
+    return pipeline.createPCollectionInternal<T>(coder);
   }
+
+  return withName("flatten", expandInternal);
 }
+
+flatten.urn = "beam:transform:flatten:v1";
diff --git a/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts b/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts
index 6e7072cc74c..4f7de5a0d47 100644
--- a/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts
+++ b/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts
@@ -17,8 +17,9 @@
  */
 
 import { KV } from "../values";
-import { PTransform } from "./transform";
+import { PTransform, PTransformClass, withName } from "./transform";
 import { PCollection } from "../pvalue";
+import { PValue } from "../pvalue";
 import * as internal from "./internal";
 import { count } from "./combiners";
 
@@ -40,13 +41,13 @@ export interface CombineFn<I, A, O> {
 type Combiner<I> = CombineFn<I, any, any> | ((a: any, b: any) => any);
 
 /**
- * A PTransform that takes a PCollection of elements, and returns a PCollection
+ * A PTransformClass that takes a PCollection of elements, and returns a PCollection
  * of elements grouped by a field, multiple fields, an expression that is used
  * as the grouping key.
  *
- * @extends PTransform
+ * @extends PTransformClass
  */
-export class GroupBy<T, K> extends PTransform<
+export class GroupBy<T, K> extends PTransformClass<
   PCollection<T>,
   PCollection<KV<K, Iterable<T>>>
 > {
@@ -72,7 +73,7 @@ export class GroupBy<T, K> extends PTransform<
     const keyFn = this.keyFn;
     return input
       .map((x) => ({ key: keyFn(x), value: x }))
-      .apply(new internal.GroupByKey());
+      .apply(internal.groupByKey());
   }
 
   combining<I>(
@@ -88,6 +89,13 @@ export class GroupBy<T, K> extends PTransform<
   }
 }
 
+export function groupBy<T, K>(
+  key: string | string[] | ((element: T) => K),
+  keyName: string | undefined = undefined
+): GroupBy<T, K> {
+  return new GroupBy<T, K>(key, keyName);
+}
+
 /**
  * Groups all elements of the input PCollection together.
  *
@@ -95,7 +103,7 @@ export class GroupBy<T, K> extends PTransform<
  * loses parallelization benefits in bringing all elements of a distributed
  * PCollection together on a single machine.
  */
-export class GroupGlobally<T> extends PTransform<
+export class GroupGlobally<T> extends PTransformClass<
   PCollection<T>,
   PCollection<Iterable<T>>
 > {
@@ -120,7 +128,11 @@ export class GroupGlobally<T> extends PTransform<
   }
 }
 
-class GroupByAndCombine<T, O> extends PTransform<
+export function groupGlobally<T>() {
+  return new GroupGlobally<T>();
+}
+
+class GroupByAndCombine<T, O> extends PTransformClass<
   PCollection<T>,
   PCollection<O>
 > {
@@ -167,8 +179,8 @@ class GroupByAndCombine<T, O> extends PTransform<
         };
       })
       .apply(
-        new internal.CombinePerKey(
-          new MultiCombineFn(this_.combiners.map((c) => c.combineFn))
+        internal.combinePerKey(
+          multiCombineFn(this_.combiners.map((c) => c.combineFn))
         )
       )
       .map(function constructResult(kv) {
@@ -190,34 +202,30 @@ class GroupByAndCombine<T, O> extends PTransform<
   }
 }
 
-// TODO: (API) Does this carry its weight as a top-level built-in function?
-// Cons: It's just a combine. Pros: It's kind of a non-obvious one.
-// NOTE: The encoded form of the elements will be used for equality checking.
-export class CountPerElement<T> extends PTransform<
+export function countPerElement<T>(): PTransform<
   PCollection<T>,
   PCollection<{ element: T; count: number }>
 > {
-  expand(input) {
-    return input.apply(
-      new GroupBy((e) => e, "element").combining((e) => e, count, "count")
-    );
-  }
+  return withName(
+    "countPerElement",
+    groupBy((e) => e, "element").combining((e) => e, count, "count")
+  );
 }
 
-export class CountGlobally<T> extends PTransform<
+export function countGlobally<T>(): PTransform<
   PCollection<T>,
   PCollection<number>
 > {
-  expand(input) {
-    return input
+  return withName("countGlobally", (input) =>
+    input
       .apply(new GroupGlobally().combining((e) => e, count, "count"))
-      .map((o) => o.count);
-  }
+      .map((o) => o.count)
+  );
 }
 
 function toCombineFn<I>(combiner: Combiner<I>): CombineFn<I, any, any> {
   if (typeof combiner == "function") {
-    return new BinaryCombineFn<I>(combiner);
+    return binaryCombineFn<I>(combiner);
   } else {
     return combiner;
   }
@@ -229,71 +237,62 @@ interface CombineSpec<T, I, O> {
   resultName: string;
 }
 
-class BinaryCombineFn<I> implements CombineFn<I, I | undefined, I> {
-  constructor(private combiner: (a: I, b: I) => I) {}
-  createAccumulator() {
-    return undefined;
-  }
-  addInput(a, b) {
-    if (a == undefined) {
-      return b;
-    } else {
-      return this.combiner(a, b);
-    }
-  }
-  mergeAccumulators(accs) {
-    return accs.filter((a) => a != undefined).reduce(this.combiner, undefined);
-  }
-  extractOutput(a) {
-    return a;
-  }
+function binaryCombineFn<I>(
+  combiner: (a: I, b: I) => I
+): CombineFn<I, I | undefined, I> {
+  return {
+    createAccumulator: () => undefined,
+    addInput: (a, b) => (a === undefined ? b : combiner(a, b)),
+    mergeAccumulators: (accs) =>
+      [...accs].filter((a) => a != undefined).reduce(combiner, undefined),
+    extractOutput: (a) => a,
+  };
 }
 
-class MultiCombineFn implements CombineFn<any[], any[], any[]> {
-  batchSize: number = 100;
-  // TODO: (Typescript) Is there a way to indicate type parameters match the above?
-  constructor(private combineFns: CombineFn<any, any, any>[]) {}
+// TODO: (Typescript) Is there a way to indicate type parameters match the above?
+function multiCombineFn(
+  combineFns: CombineFn<any, any, any>[],
+  batchSize: number = 100
+): CombineFn<any[], any[], any[]> {
+  return {
+    createAccumulator: () => combineFns.map((fn) => fn.createAccumulator()),
 
-  createAccumulator() {
-    return this.combineFns.map((fn) => fn.createAccumulator());
-  }
-
-  addInput(accumulators: any[], inputs: any[]) {
-    // TODO: (Cleanup) Does javascript have a clean zip?
-    let result: any[] = [];
-    for (let i = 0; i < this.combineFns.length; i++) {
-      result.push(this.combineFns[i].addInput(accumulators[i], inputs[i]));
-    }
-    return result;
-  }
+    addInput: (accumulators: any[], inputs: any[]) => {
+      // TODO: (Cleanup) Does javascript have a clean zip?
+      let result: any[] = [];
+      for (let i = 0; i < combineFns.length; i++) {
+        result.push(combineFns[i].addInput(accumulators[i], inputs[i]));
+      }
+      return result;
+    },
 
-  mergeAccumulators(accumulators: Iterable<any[]>) {
-    const combineFns = this.combineFns;
-    let batches = combineFns.map((fn) => [fn.createAccumulator()]);
-    for (let acc of accumulators) {
+    mergeAccumulators: (accumulators: Iterable<any[]>) => {
+      let batches = combineFns.map((fn) => [fn.createAccumulator()]);
+      for (let acc of accumulators) {
+        for (let i = 0; i < combineFns.length; i++) {
+          batches[i].push(acc[i]);
+          if (batches[i].length > batchSize) {
+            batches[i] = [combineFns[i].mergeAccumulators(batches[i])];
+          }
+        }
+      }
       for (let i = 0; i < combineFns.length; i++) {
-        batches[i].push(acc[i]);
-        if (batches[i].length > this.batchSize) {
+        if (batches[i].length > 1) {
           batches[i] = [combineFns[i].mergeAccumulators(batches[i])];
         }
       }
-    }
-    for (let i = 0; i < combineFns.length; i++) {
-      if (batches[i].length > 1) {
-        batches[i] = [combineFns[i].mergeAccumulators(batches[i])];
-      }
-    }
-    return batches.map((batch) => batch[0]);
-  }
+      return batches.map((batch) => batch[0]);
+    },
 
-  extractOutput(accumulators: any[]) {
-    // TODO: (Cleanup) Does javascript have a clean zip?
-    let result: any[] = [];
-    for (let i = 0; i < this.combineFns.length; i++) {
-      result.push(this.combineFns[i].extractOutput(accumulators[i]));
-    }
-    return result;
-  }
+    extractOutput: (accumulators: any[]) => {
+      // TODO: (Cleanup) Does javascript have a clean zip?
+      let result: any[] = [];
+      for (let i = 0; i < combineFns.length; i++) {
+        result.push(combineFns[i].extractOutput(accumulators[i]));
+      }
+      return result;
+    },
+  };
 }
 
 // TODO: (Typescript) Can I type T as "something that has this key" and/or,
@@ -330,7 +329,5 @@ function extractFn<T, K>(extractor: string | string[] | ((T) => K)) {
 import { requireForSerialization } from "../serialization";
 requireForSerialization("apache_beam.transforms.pardo", exports);
 requireForSerialization("apache_beam.transforms.pardo", {
-  BinaryCombineFn: BinaryCombineFn,
   GroupByAndCombine: GroupByAndCombine,
-  MultiCombineFn: MultiCombineFn,
 });
diff --git a/sdks/typescript/src/apache_beam/transforms/index.ts b/sdks/typescript/src/apache_beam/transforms/index.ts
index c659f87204f..f0d49a6f5c3 100644
--- a/sdks/typescript/src/apache_beam/transforms/index.ts
+++ b/sdks/typescript/src/apache_beam/transforms/index.ts
@@ -23,7 +23,7 @@ export * from "./group_and_combine";
 export * from "./pardo";
 export * from "./transform";
 export * from "./window";
-export { Impulse } from "./internal";
+export { impulse, withRowCoder } from "./internal";
 
 import { requireForSerialization } from "../serialization";
 requireForSerialization("apache_beam.transforms", exports);
diff --git a/sdks/typescript/src/apache_beam/transforms/internal.ts b/sdks/typescript/src/apache_beam/transforms/internal.ts
index 27f03c836ab..1e257704cd1 100644
--- a/sdks/typescript/src/apache_beam/transforms/internal.ts
+++ b/sdks/typescript/src/apache_beam/transforms/internal.ts
@@ -19,75 +19,82 @@
 import * as runnerApi from "../proto/beam_runner_api";
 import * as urns from "../internal/urns";
 
-import { PTransform, withName } from "./transform";
+import {
+  PTransform,
+  PTransformClass,
+  withName,
+  extractName,
+} from "./transform";
 import { PCollection, Root } from "../pvalue";
 import { Pipeline } from "../internal/pipeline";
 import { Coder } from "../coders/coders";
 import { BytesCoder, KVCoder, IterableCoder } from "../coders/required_coders";
-import { ParDo } from "./pardo";
+import { parDo } from "./pardo";
 import { GeneralObjectCoder } from "../coders/js_coders";
+import { RowCoder } from "../coders/row_coder";
 import { KV } from "../values";
 import { CombineFn } from "./group_and_combine";
 
 /**
- * `Impulse` is the basic *source* primitive `PTransform`. It receives a Beam
+ * `Impulse` is the basic *source* primitive `PTransformClass`. It receives a Beam
  * Root as input, and returns a `PCollection` of `Uint8Array` with a single
  * element with length=0 (i.e. the empty byte array: `new Uint8Array("")`).
  *
  * `Impulse` is used to start the execution of a pipeline with a single element
  * that can trigger execution of a source or SDF.
  */
-export class Impulse extends PTransform<Root, PCollection<Uint8Array>> {
-  // static urn: string = runnerApi.StandardPTransforms_Primitives.IMPULSE.urn;
-  // TODO: (Cleanup) use above line, not below line.
-  static urn: string = "beam:transform:impulse:v1";
-
-  constructor() {
-    super("Impulse"); // TODO: (Unique names) pass null/nothing and get from reflection
-  }
-
-  expandInternal(
+export function impulse(): PTransform<Root, PCollection<Uint8Array>> {
+  function expandInternal(
     input: Root,
     pipeline: Pipeline,
     transformProto: runnerApi.PTransform
-  ): PCollection<Uint8Array> {
+  ) {
     transformProto.spec = runnerApi.FunctionSpec.create({
-      urn: Impulse.urn,
+      urn: impulse.urn,
       payload: urns.IMPULSE_BUFFER,
     });
     transformProto.environmentId = "";
     return pipeline.createPCollectionInternal(new BytesCoder());
   }
+
+  return withName("impulse", expandInternal);
 }
 
+impulse.urn = "beam:transform:impulse:v1";
+
 // TODO: (API) Should we offer a method on PCollection to do this?
-export class WithCoderInternal<T> extends PTransform<
-  PCollection<T>,
-  PCollection<T>
-> {
-  constructor(private coder: Coder<T>) {
-    super("WithCoderInternal(" + coder + ")");
-  }
-  expandInternal(
-    input: PCollection<T>,
-    pipeline: Pipeline,
-    transformProto: runnerApi.PTransform
-  ) {
-    // IDENTITY rather than Flatten for better fusion.
-    transformProto.spec = {
-      urn: ParDo.urn,
-      payload: runnerApi.ParDoPayload.toBinary(
-        runnerApi.ParDoPayload.create({
-          doFn: runnerApi.FunctionSpec.create({
-            urn: urns.IDENTITY_DOFN_URN,
-            payload: undefined!,
-          }),
-        })
-      ),
-    };
-
-    return pipeline.createPCollectionInternal<T>(this.coder);
-  }
+export function withCoderInternal<T>(
+  coder: Coder<T>
+): PTransform<PCollection<T>, PCollection<T>> {
+  return withName(
+    `withCoderInternal(${extractName(coder)})`,
+    (
+      input: PCollection<T>,
+      pipeline: Pipeline,
+      transformProto: runnerApi.PTransform
+    ) => {
+      // IDENTITY rather than Flatten for better fusion.
+      transformProto.spec = {
+        urn: parDo.urn,
+        payload: runnerApi.ParDoPayload.toBinary(
+          runnerApi.ParDoPayload.create({
+            doFn: runnerApi.FunctionSpec.create({
+              urn: urns.IDENTITY_DOFN_URN,
+              payload: undefined!,
+            }),
+          })
+        ),
+      };
+
+      return pipeline.createPCollectionInternal<T>(coder);
+    }
+  );
+}
+
+export function withRowCoder<T extends Object>(
+  exemplar: T
+): PTransform<PCollection<T>, PCollection<T>> {
+  return withCoderInternal(RowCoder.fromJSON(exemplar));
 }
 
 /**
@@ -101,15 +108,11 @@ export class WithCoderInternal<T> extends PTransform<
  * `GroupByKey` operations are used under the hood to execute combines,
  * streaming triggers, stateful transforms, etc.
  */
-export class GroupByKey<K, V> extends PTransform<
+export function groupByKey<K, V>(): PTransform<
   PCollection<KV<K, V>>,
   PCollection<KV<K, Iterable<V>>>
 > {
-  // static urn: string = runnerApi.StandardPTransforms_Primitives.GROUP_BY_KEY.urn;
-  // TODO: (Cleanup) use above line, not below line.
-  static urn: string = "beam:transform:group_by_key:v1";
-
-  expandInternal(
+  function expandInternal(
     input: PCollection<KV<K, V>>,
     pipeline: Pipeline,
     transformProto: runnerApi.PTransform
@@ -124,15 +127,15 @@ export class GroupByKey<K, V> extends PTransform<
     if (inputCoderProto.spec!.urn != KVCoder.URN) {
       return input
         .apply(
-          new WithCoderInternal(
+          withCoderInternal(
             new KVCoder(new GeneralObjectCoder(), new GeneralObjectCoder())
           )
         )
-        .apply(new GroupByKey());
+        .apply(groupByKey());
     }
 
     transformProto.spec = runnerApi.FunctionSpec.create({
-      urn: GroupByKey.urn,
+      urn: groupByKey.urn,
       payload: undefined!,
     });
     transformProto.environmentId = "";
@@ -144,8 +147,13 @@ export class GroupByKey<K, V> extends PTransform<
     const outputCoder = new KVCoder(keyCoder, iterableValueCoder);
     return pipeline.createPCollectionInternal(outputCoder);
   }
+
+  return withName("groupByKey", expandInternal);
 }
 
+// TODO: (Cleanup) runnerApi.StandardPTransformClasss_Primitives.GROUP_BY_KEY.urn.
+groupByKey.urn = "beam:transform:group_by_key:v1";
+
 /**
  * This transform is used to perform aggregations over groups of elements.
  *
@@ -159,28 +167,24 @@ export class GroupByKey<K, V> extends PTransform<
  * before a `GroupByKey` and after the `GroupByKey`. The partial aggregations
  * help reduce the original data into a single aggregator per key per worker.
  */
-export class CombinePerKey<K, InputT, AccT, OutputT> extends PTransform<
-  PCollection<KV<K, InputT>>,
-  PCollection<KV<K, OutputT>>
-> {
-  constructor(private combineFn: CombineFn<InputT, AccT, OutputT>) {
-    super();
+export function combinePerKey<K, InputT, AccT, OutputT>(
+  combineFn: CombineFn<InputT, AccT, OutputT>
+): PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
+  function expandInternal(input: PCollection<KV<any, InputT>>) {
+    return input //
+      .apply(groupByKey())
+      .map(
+        withName("applyCombine", (kv) => ({
+          key: kv.key,
+          value: combineFn.extractOutput(
+            kv.value.reduce(
+              combineFn.addInput.bind(combineFn),
+              combineFn.createAccumulator()
+            )
+          ),
+        }))
+      );
   }
 
-  // Let the runner do the combiner lifting, when possible, handling timestamps,
-  // windowing, and triggering as needed.
-  expand(input: PCollection<KV<any, InputT>>) {
-    const combineFn = this.combineFn;
-    return input.apply(new GroupByKey()).map(
-      withName("applyCombine", (kv) => ({
-        key: kv.key,
-        value: combineFn.extractOutput(
-          kv.value.reduce(
-            combineFn.addInput.bind(combineFn),
-            combineFn.createAccumulator()
-          )
-        ),
-      }))
-    );
-  }
+  return withName(`combinePerKey(${extractName(combineFn)})`, expandInternal);
 }
diff --git a/sdks/typescript/src/apache_beam/transforms/pardo.ts b/sdks/typescript/src/apache_beam/transforms/pardo.ts
index d6c550299b9..e05c0cb14f5 100644
--- a/sdks/typescript/src/apache_beam/transforms/pardo.ts
+++ b/sdks/typescript/src/apache_beam/transforms/pardo.ts
@@ -23,7 +23,12 @@ import { GeneralObjectCoder } from "../coders/js_coders";
 import { PCollection } from "../pvalue";
 import { Pipeline } from "../internal/pipeline";
 import { serializeFn } from "../internal/serialize";
-import { PTransform, extractName } from "./transform";
+import {
+  PTransform,
+  PTransformClass,
+  withName,
+  extractName,
+} from "./transform";
 import { PaneInfo, Instant, Window, WindowedValue } from "../values";
 
 export interface DoFn<InputT, OutputT, ContextT = undefined> {
@@ -40,46 +45,36 @@ export interface DoFn<InputT, OutputT, ContextT = undefined> {
 // TODO: (API) Do we need an AsyncDoFn (and async[Flat]Map) to be able to call
 // async functions in the body of the fns. Or can they always be Async?
 // The latter seems to have perf issues.
-// (For PTransforms, it's a major usability issue, but maybe we can always
+// (For PTransformClasss, it's a major usability issue, but maybe we can always
 // await when calling user code.  OTOH, I don't know what the performance
 // impact would be for creating promises for every element of every operation
 // which is typically a very performance critical spot to optimize.)
 
-export class ParDo<InputT, OutputT, ContextT = undefined> extends PTransform<
-  PCollection<InputT>,
-  PCollection<OutputT>
-> {
-  private doFn: DoFn<InputT, OutputT, ContextT>;
-  private context: ContextT;
-  // static urn: string = runnerApi.StandardPTransforms_Primitives.PAR_DO.urn;
-  // TODO: (Cleanup) use above line, not below line.
-  static urn: string = "beam:transform:pardo:v1";
-  // TODO: (Typescript) Can the arg be optional iff ContextT is undefined?
-  constructor(
-    doFn: DoFn<InputT, OutputT, ContextT>,
-    contextx: ContextT = undefined!
-  ) {
-    super(() => "ParDo(" + extractName(doFn) + ")");
-    this.doFn = doFn;
-    this.context = contextx;
-  }
-
-  expandInternal(
+// TODO: (Typescript) Can the context arg be optional iff ContextT is undefined?
+export function parDo<
+  InputT,
+  OutputT,
+  ContextT extends Object | undefined = undefined
+>(
+  doFn: DoFn<InputT, OutputT, ContextT>,
+  context: ContextT = undefined!
+): PTransform<PCollection<InputT>, PCollection<OutputT>> {
+  function expandInternal(
     input: PCollection<InputT>,
     pipeline: Pipeline,
     transformProto: runnerApi.PTransform
   ) {
     // Extract and populate side inputs from the context.
-    var context;
     const sideInputs = {};
-    if (typeof this.context == "object") {
-      context = Object.create(this.context as Object);
+    var contextCopy;
+    if (typeof context == "object") {
+      contextCopy = Object.create(context as Object) as any;
       const components = pipeline.context.components;
-      for (const [name, value] of Object.entries(this.context)) {
+      for (const [name, value] of Object.entries(context)) {
         if (value instanceof SideInputParam) {
           const inputName = "side." + name;
           transformProto.inputs[inputName] = value.pcoll.getId();
-          context[name] = copySideInputWithId(value, inputName);
+          contextCopy[name] = copySideInputWithId(value, inputName);
           const mainWindowingStrategyId =
             components.pcollections[input.getId()].windowingStrategyId;
           const sideWindowingStrategyId =
@@ -109,23 +104,23 @@ export class ParDo<InputT, OutputT, ContextT = undefined> extends PTransform<
             },
           };
         } else {
-          context[name] = value;
+          contextCopy[name] = value;
         }
       }
     } else {
-      context = this.context;
+      contextCopy = context;
     }
 
     // Now finally construct the proto.
     transformProto.spec = runnerApi.FunctionSpec.create({
-      urn: ParDo.urn,
+      urn: parDo.urn,
       payload: runnerApi.ParDoPayload.toBinary(
         runnerApi.ParDoPayload.create({
           doFn: runnerApi.FunctionSpec.create({
             urn: urns.SERIALIZED_JS_DOFN_INFO,
             payload: serializeFn({
-              doFn: this.doFn,
-              context: context,
+              doFn: doFn,
+              context: contextCopy,
             }),
           }),
           sideInputs: sideInputs,
@@ -140,89 +135,68 @@ export class ParDo<InputT, OutputT, ContextT = undefined> extends PTransform<
       new GeneralObjectCoder()
     );
   }
+
+  return withName(`parDo(${extractName(doFn)})`, expandInternal);
 }
 
+// TODO: (Cleanup) use runnerApi.StandardPTransformClasss_Primitives.PAR_DO.urn.
+parDo.urn = "beam:transform:pardo:v1";
+
+export type SplitOptions = {
+  knownTags?: string[];
+  unknownTagBehavior?: "error" | "ignore" | "rename" | undefined;
+  unknownTagName?: string;
+  exclusive?: boolean;
+};
+
+/**
+ * Splits a single PCollection of objects, with keys k, into an object of
+ * PCollections, with the same keys k, where each PCollection consists of the
+ * values associated with that key. That is,
+ *
+ * PCollection<{a: T, b: U, ...}> maps to {a: PCollection<T>, b: PCollection<U>, ...}
+ */
 // TODO: (API) Consider as top-level method.
 // TODO: Naming.
-// TODO: Allow default?  Technically splitter can be implemented/wrapped to produce such.
-// TODO: Can we enforce splitter's output with the typing system to lie in targets?
-// TODO: (Optionally?) delete the switched-on field.
-// TODO: (API) Consider doing
-//     [{a: aValue}, {g: bValue}, ...] => a: [aValue, ...], b: [bValue, ...]
-// instead of
-//     [{key: 'a', aValueFields}, {key: 'b', bValueFields}, ...] =>
-//          a: [{key: 'a', aValueFields}, ...], b: [{key: 'b', aValueFields}, ...],
-// (implemented below as Split2).
-export class Split<T> extends PTransform<
-  PCollection<T>,
-  { [key: string]: PCollection<T> }
-> {
-  private tags: string[];
-  constructor(private splitter: (T) => string, ...tags: string[]) {
-    super("Split(" + tags + ")");
-    this.tags = tags;
-  }
-  expandInternal(
+export function split<T extends { [key: string]: unknown }>(
+  tags: string[],
+  options: SplitOptions = {}
+): PTransform<PCollection<T>, { [P in keyof T]: PCollection<T[P]> }> {
+  function expandInternal(
     input: PCollection<T>,
     pipeline: Pipeline,
     transformProto: runnerApi.PTransform
   ) {
-    transformProto.spec = runnerApi.FunctionSpec.create({
-      urn: ParDo.urn,
-      payload: runnerApi.ParDoPayload.toBinary(
-        runnerApi.ParDoPayload.create({
-          doFn: runnerApi.FunctionSpec.create({
-            urn: urns.SPLITTING_JS_DOFN_URN,
-            payload: serializeFn({ splitter: this.splitter }),
-          }),
-        })
-      ),
-    });
-
-    const this_ = this;
-    return Object.fromEntries(
-      this_.tags.map((tag) => [
-        tag,
-        pipeline.createPCollectionInternal<T>(
-          pipeline.context.getPCollectionCoderId(input)
-        ),
-      ])
-    );
-  }
-}
+    if (options.exclusive === undefined) {
+      options.exclusive = true;
+    }
+    if (options.unknownTagBehavior === undefined) {
+      options.unknownTagBehavior = "error";
+    }
+    if (
+      options.unknownTagBehavior == "rename" &&
+      !tags.includes(options.unknownTagName!)
+    ) {
+      tags.push(options.unknownTagName!);
+    }
+    if (options.knownTags === undefined) {
+      options.knownTags = tags;
+    }
 
-// TODO: (Typescript) Is it possible to type that this takes
-// PCollection<{a: T, b: U, ...}> to {a: PCollection<T>, b: PCollection<U>, ...}
-// Seems to requires a cast inside expandInternal. But at least the cast is contained there.
-export class Split2<T extends { [key: string]: unknown }> extends PTransform<
-  PCollection<T>,
-  { [P in keyof T]: PCollection<T[P]> }
-> {
-  private tags: string[];
-  constructor(...tags: string[]) {
-    super("Split2(" + tags + ")");
-    this.tags = tags;
-  }
-  expandInternal(
-    input: PCollection<T>,
-    pipeline: Pipeline,
-    transformProto: runnerApi.PTransform
-  ) {
     transformProto.spec = runnerApi.FunctionSpec.create({
-      urn: ParDo.urn,
+      urn: parDo.urn,
       payload: runnerApi.ParDoPayload.toBinary(
         runnerApi.ParDoPayload.create({
           doFn: runnerApi.FunctionSpec.create({
-            urn: urns.SPLITTING2_JS_DOFN_URN,
-            payload: new Uint8Array(),
+            urn: urns.SPLITTING_JS_DOFN_URN,
+            payload: serializeFn(options),
           }),
         })
       ),
     });
 
-    const this_ = this;
     return Object.fromEntries(
-      this_.tags.map((tag) => [
+      tags.map((tag) => [
         tag,
         pipeline.createPCollectionInternal<T[typeof tag]>(
           pipeline.context.getPCollectionCoderId(input)
@@ -230,6 +204,8 @@ export class Split2<T extends { [key: string]: unknown }> extends PTransform<
       ])
     ) as { [P in keyof T]: PCollection<T[P]> };
   }
+
+  return withName(`Split(${tags})`, expandInternal);
 }
 
 /*
@@ -238,15 +214,11 @@ export class Split2<T extends { [key: string]: unknown }> extends PTransform<
  * special `lookup` method to retrieve the relevant value associated with the
  * currently-being-processed element.
  */
-export abstract class ParDoParam<T> {
-  readonly parDoParamName: string;
-
+export class ParDoParam<T> {
   // Provided externally.
   private provider: ParamProvider | undefined;
 
-  constructor(parDoParamName: string) {
-    this.parDoParamName = parDoParamName;
-  }
+  constructor(readonly parDoParamName: string) {}
 
   // TODO: Nameing "get" seems to be special.
   lookup(): T {
@@ -266,23 +238,17 @@ export interface ParamProvider {
   provide<T>(param: ParDoParam<T>): T;
 }
 
-export class WindowParam extends ParDoParam<Window> {
-  constructor() {
-    super("window");
-  }
+export function windowParam(): ParDoParam<Window> {
+  return new ParDoParam<Window>("window");
 }
 
-export class TimestampParam extends ParDoParam<Instant> {
-  constructor() {
-    super("timestamp");
-  }
+export function timestampParam(): ParDoParam<Instant> {
+  return new ParDoParam<Instant>("timestamp");
 }
 
 // TODO: Naming. Should this be PaneParam?
-export class PaneInfoParam extends ParDoParam<PaneInfo> {
-  constructor() {
-    super("paneinfo");
-  }
+export function paneInfoParam(): ParDoParam<PaneInfo> {
+  return new ParDoParam<PaneInfo>("paneinfo");
 }
 
 interface SideInputAccessor<PCollT, AccessorT, ValueT> {
@@ -330,35 +296,32 @@ function copySideInputWithId<PCollT, AccessorT, ValueT>(
   return copy;
 }
 
-export class IterableSideInput<T> extends SideInputParam<
-  T,
-  Iterable<T>,
-  Iterable<T>
-> {
-  constructor(pcoll: PCollection<T>) {
-    super(pcoll, {
-      accessPattern: "beam:side_input:iterable:v1",
-      toValue: (iter: Iterable<T>) => iter,
-    });
-  }
+export function iterableSideInput<T>(
+  pcoll: PCollection<T>
+): SideInputParam<T, Iterable<T>, Iterable<T>> {
+  return new SideInputParam<T, Iterable<T>, Iterable<T>>(pcoll, {
+    accessPattern: "beam:side_input:iterable:v1",
+    toValue: (iter: Iterable<T>) => iter,
+  });
 }
 
-export class SingletonSideInput<T> extends SideInputParam<T, Iterable<T>, T> {
-  constructor(pcoll: PCollection<T>, defaultValue: T | undefined = undefined) {
-    super(pcoll, {
-      accessPattern: "beam:side_input:iterable:v1",
-      toValue: (iter: Iterable<T>) => {
-        const asArray = Array.from(iter);
-        if (asArray.length == 0 && defaultValue != undefined) {
-          return defaultValue;
-        } else if (asArray.length == 1) {
-          return asArray[0];
-        } else {
-          throw new Error("Expected a single element, got " + asArray.length);
-        }
-      },
-    });
-  }
+export function singletonSideInput<T>(
+  pcoll: PCollection<T>,
+  defaultValue: T | undefined = undefined
+): SideInputParam<T, Iterable<T>, T> {
+  return new SideInputParam<T, Iterable<T>, T>(pcoll, {
+    accessPattern: "beam:side_input:iterable:v1",
+    toValue: (iter: Iterable<T>) => {
+      const asArray = Array.from(iter);
+      if (asArray.length == 0 && defaultValue != undefined) {
+        return defaultValue;
+      } else if (asArray.length == 1) {
+        return asArray[0];
+      } else {
+        throw new Error("Expected a single element, got " + asArray.length);
+      }
+    },
+  });
 }
 
 // TODO: (Extension) Map side inputs.
diff --git a/sdks/typescript/src/apache_beam/transforms/sql.ts b/sdks/typescript/src/apache_beam/transforms/sql.ts
index 33a92423c58..417a277609c 100644
--- a/sdks/typescript/src/apache_beam/transforms/sql.ts
+++ b/sdks/typescript/src/apache_beam/transforms/sql.ts
@@ -31,25 +31,23 @@ import { serviceProviderFromJavaGradleTarget } from "../utils/service";
  * corresponding names can be used in the sql statement.
  *
  * The input(s) must be schema'd (i.e. use the RowCoder). This can be done
- * by explicitly setting the schema with internal.WithCoderInternal or passing
+ * by explicitly setting the schema with external.withRowCoder or passing
  * a prototype element in as a second argument, e.g.
  *
  * pcoll.applyAsync(
- *    new SqlTransform(
+ *    sqlTransform(
  *        "select a, b from PCOLLECTION",
  *        {a: 0, b: "string"},
  *    ));
  */
-export class SqlTransform<
+export function sqlTransform<
   InputT extends PCollection<any> | { [key: string]: PCollection<any> }
-> extends transform.AsyncPTransform<InputT, PCollection<any>> {
+>(
+  query: string,
+  inputTypes = null
+): transform.AsyncPTransform<InputT, PCollection<any>> {
   // TOOD: (API) (Typescript): How to infer input_types, or at least make it optional.
-  constructor(private query: string, private inputTypes = null) {
-    // TODO: Unique names. Should we truncate/omit the full SQL statement?
-    super("Sql(" + query + ")");
-  }
-
-  async asyncExpand(input: InputT): Promise<PCollection<any>> {
+  async function expandInternal(input: InputT): Promise<PCollection<any>> {
     function withCoder<T>(pcoll: PCollection<T>, type): PCollection<T> {
       if (type == null) {
         if (
@@ -67,33 +65,30 @@ export class SqlTransform<
         }
         return pcoll;
       }
-      return pcoll.apply(
-        new internal.WithCoderInternal(row_coder.RowCoder.fromJSON(type))
-      );
+      return pcoll.apply(internal.withRowCoder(type));
     }
 
     if (input instanceof PCollection) {
-      input = withCoder(input, this.inputTypes) as InputT;
+      input = withCoder(input, inputTypes) as InputT;
     } else {
       input = Object.fromEntries(
         Object.keys(input).map((tag) => [
           tag,
-          withCoder(
-            input[tag],
-            this.inputTypes == null ? null : this.inputTypes[tag]
-          ),
+          withCoder(input[tag], inputTypes == null ? null : inputTypes[tag]),
         ])
       ) as InputT;
     }
 
     return await P(input).asyncApply(
-      new external.RawExternalTransform(
+      external.rawExternalTransform(
         "beam:external:java:sql:v1",
-        { query: this.query },
+        { query: query },
         serviceProviderFromJavaGradleTarget(
           "sdks:java:extensions:sql:expansion-service:shadowJar"
         )
       )
     );
   }
+
+  return transform.withName(`sqlTransform(${query})`, expandInternal);
 }
diff --git a/sdks/typescript/src/apache_beam/transforms/transform.ts b/sdks/typescript/src/apache_beam/transforms/transform.ts
index 4d7fcfcdc02..55b1da53a75 100644
--- a/sdks/typescript/src/apache_beam/transforms/transform.ts
+++ b/sdks/typescript/src/apache_beam/transforms/transform.ts
@@ -63,7 +63,7 @@ export function extractName<T>(withName: T): string {
 // call rather than forcing the asynchronous nature all the way up the call
 // hierarchy).
 
-export class AsyncPTransform<
+export class AsyncPTransformClass<
   InputT extends PValue<any>,
   OutputT extends PValue<any>
 > {
@@ -86,10 +86,10 @@ export class AsyncPTransform<
   }
 }
 
-export class PTransform<
+export class PTransformClass<
   InputT extends PValue<any>,
   OutputT extends PValue<any>
-> extends AsyncPTransform<InputT, OutputT> {
+> extends AsyncPTransformClass<InputT, OutputT> {
   expand(input: InputT): OutputT {
     throw new Error("Method expand has not been implemented.");
   }
@@ -114,3 +114,27 @@ export class PTransform<
     return this.expandInternal(input, pipeline, transformProto);
   }
 }
+
+export type AsyncPTransform<
+  InputT extends PValue<any>,
+  OutputT extends PValue<any>
+> =
+  | AsyncPTransformClass<InputT, OutputT>
+  | ((input: InputT) => Promise<OutputT>)
+  | ((
+      input: InputT,
+      pipeline: Pipeline,
+      transformProto: runnerApi.PTransform
+    ) => Promise<OutputT>);
+
+export type PTransform<
+  InputT extends PValue<any>,
+  OutputT extends PValue<any>
+> =
+  | PTransformClass<InputT, OutputT>
+  | ((input: InputT) => OutputT)
+  | ((
+      input: InputT,
+      pipeline: Pipeline,
+      transformProto: runnerApi.PTransform
+    ) => OutputT);
diff --git a/sdks/typescript/src/apache_beam/transforms/window.ts b/sdks/typescript/src/apache_beam/transforms/window.ts
index 0d66bcbb949..57a335ba580 100644
--- a/sdks/typescript/src/apache_beam/transforms/window.ts
+++ b/sdks/typescript/src/apache_beam/transforms/window.ts
@@ -19,12 +19,12 @@
 import * as runnerApi from "../proto/beam_runner_api";
 import * as urns from "../internal/urns";
 
-import { PTransform } from "./transform";
+import { PTransform, withName, extractName } from "./transform";
 import { Coder } from "../coders/coders";
 import { Window } from "../values";
 import { PCollection } from "../pvalue";
 import { Pipeline } from "../internal/pipeline";
-import { ParDo } from "./pardo";
+import { parDo } from "./pardo";
 import { serializeFn } from "../internal/serialize";
 
 export interface WindowFn<W extends Window> {
@@ -35,108 +35,96 @@ export interface WindowFn<W extends Window> {
   assignsToOneWindow: () => boolean;
 }
 
-export class WindowInto<T, W extends Window> extends PTransform<
-  PCollection<T>,
-  PCollection<T>
-> {
-  static createWindowingStrategy(
-    pipeline: Pipeline,
-    windowFn: WindowFn<any>,
-    windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
-  ): runnerApi.WindowingStrategy {
-    let result: runnerApi.WindowingStrategy;
-    if (windowingStrategyBase == undefined) {
-      result = {
-        windowFn: undefined!,
-        windowCoderId: undefined!,
-        mergeStatus: undefined!,
-        assignsToOneWindow: undefined!,
-        trigger: { trigger: { oneofKind: "default", default: {} } },
-        accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
-        outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
-        closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
-        onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
-        allowedLateness: BigInt(0),
-        environmentId: pipeline.defaultEnvironment,
-      };
-    } else {
-      result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
-    }
-    result.windowFn = windowFn.toProto();
-    result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
-    result.mergeStatus = windowFn.isMerging()
-      ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
-      : runnerApi.MergeStatus_Enum.NON_MERGING;
-    result.assignsToOneWindow = windowFn.assignsToOneWindow();
-    return result;
-  }
-
-  constructor(
-    private windowFn: WindowFn<W>,
-    private windowingStrategyBase:
-      | runnerApi.WindowingStrategy
-      | undefined = undefined
-  ) {
-    super("WindowInto(" + windowFn + ", " + windowingStrategyBase + ")");
+export function createWindowingStrategyProto(
+  pipeline: Pipeline,
+  windowFn: WindowFn<any>,
+  windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+): runnerApi.WindowingStrategy {
+  let result: runnerApi.WindowingStrategy;
+  if (windowingStrategyBase == undefined) {
+    result = {
+      windowFn: undefined!,
+      windowCoderId: undefined!,
+      mergeStatus: undefined!,
+      assignsToOneWindow: undefined!,
+      trigger: { trigger: { oneofKind: "default", default: {} } },
+      accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
+      outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
+      closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
+      onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
+      allowedLateness: BigInt(0),
+      environmentId: pipeline.defaultEnvironment,
+    };
+  } else {
+    result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
   }
+  result.windowFn = windowFn.toProto();
+  result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
+  result.mergeStatus = windowFn.isMerging()
+    ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
+    : runnerApi.MergeStatus_Enum.NON_MERGING;
+  result.assignsToOneWindow = windowFn.assignsToOneWindow();
+  return result;
+}
 
-  expandInternal(
-    input: PCollection<T>,
-    pipeline: Pipeline,
-    transformProto: runnerApi.PTransform
-  ) {
-    transformProto.spec = runnerApi.FunctionSpec.create({
-      urn: ParDo.urn,
-      payload: runnerApi.ParDoPayload.toBinary(
-        runnerApi.ParDoPayload.create({
-          doFn: runnerApi.FunctionSpec.create({
-            urn: urns.JS_WINDOW_INTO_DOFN_URN,
-            payload: serializeFn({ windowFn: this.windowFn }),
-          }),
-        })
-      ),
-    });
+export function windowInto<T, W extends Window>(
+  windowFn: WindowFn<W>,
+  windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+): PTransform<PCollection<T>, PCollection<T>> {
+  return withName(
+    `WindowInto(${extractName(windowFn)}, ${windowingStrategyBase})`,
+    (
+      input: PCollection<T>,
+      pipeline: Pipeline,
+      transformProto: runnerApi.PTransform
+    ) => {
+      transformProto.spec = runnerApi.FunctionSpec.create({
+        urn: parDo.urn,
+        payload: runnerApi.ParDoPayload.toBinary(
+          runnerApi.ParDoPayload.create({
+            doFn: runnerApi.FunctionSpec.create({
+              urn: urns.JS_WINDOW_INTO_DOFN_URN,
+              payload: serializeFn({ windowFn: windowFn }),
+            }),
+          })
+        ),
+      });
 
-    const inputCoder = pipeline.context.getPCollectionCoderId(input);
-    return pipeline.createPCollectionInternal<T>(
-      inputCoder,
-      WindowInto.createWindowingStrategy(
-        pipeline,
-        this.windowFn,
-        this.windowingStrategyBase
-      )
-    );
-  }
+      const inputCoder = pipeline.context.getPCollectionCoderId(input);
+      return pipeline.createPCollectionInternal<T>(
+        inputCoder,
+        createWindowingStrategyProto(pipeline, windowFn, windowingStrategyBase)
+      );
+    }
+  );
 }
 
 // TODO: (Cleanup) Add restrictions on moving backwards?
-export class AssignTimestamps<T> extends PTransform<
-  PCollection<T>,
-  PCollection<T>
-> {
-  constructor(private func: (T, Instant) => typeof Instant) {
-    super();
-  }
-
-  expandInternal(
-    input: PCollection<T>,
-    pipeline: Pipeline,
-    transformProto: runnerApi.PTransform
-  ) {
-    transformProto.spec = runnerApi.FunctionSpec.create({
-      urn: ParDo.urn,
-      payload: runnerApi.ParDoPayload.toBinary(
-        runnerApi.ParDoPayload.create({
-          doFn: runnerApi.FunctionSpec.create({
-            urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
-            payload: serializeFn({ func: this.func }),
-          }),
-        })
-      ),
-    });
+export function assignTimestamps<T>(
+  timestampFn: (T, Instant) => typeof Instant
+): PTransform<PCollection<T>, PCollection<T>> {
+  return withName(
+    `assignTimestamp(${extractName(timestampFn)})`,
+    (
+      input: PCollection<T>,
+      pipeline: Pipeline,
+      transformProto: runnerApi.PTransform
+    ) => {
+      transformProto.spec = runnerApi.FunctionSpec.create({
+        urn: parDo.urn,
+        payload: runnerApi.ParDoPayload.toBinary(
+          runnerApi.ParDoPayload.create({
+            doFn: runnerApi.FunctionSpec.create({
+              urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
+              payload: serializeFn({ func: timestampFn }),
+            }),
+          })
+        ),
+      });
 
-    return pipeline.createPCollectionInternal<T>(
-      pipeline.context.getPCollectionCoderId(input)
-    );
-  }
+      return pipeline.createPCollectionInternal<T>(
+        pipeline.context.getPCollectionCoderId(input)
+      );
+    }
+  );
 }
diff --git a/sdks/typescript/src/apache_beam/transforms/windowings.ts b/sdks/typescript/src/apache_beam/transforms/windowings.ts
index 2350e417e04..89c4941ed07 100644
--- a/sdks/typescript/src/apache_beam/transforms/windowings.ts
+++ b/sdks/typescript/src/apache_beam/transforms/windowings.ts
@@ -30,106 +30,70 @@ import {
 } from "../coders/standard_coders";
 import { GlobalWindow, Instant, IntervalWindow } from "../values";
 
-export class GlobalWindows implements WindowFn<GlobalWindow> {
-  assignWindows(Instant) {
-    return [new GlobalWindow()];
-  }
-  windowCoder() {
-    return new GlobalWindowCoder();
-  }
-  toProto() {
-    return {
+export function globalWindows(): WindowFn<GlobalWindow> {
+  return {
+    assignWindows: (Instant) => [new GlobalWindow()],
+    windowCoder: () => new GlobalWindowCoder(),
+    isMerging: () => false,
+    assignsToOneWindow: () => true,
+    toProto: () => ({
       urn: "beam:window_fn:global_windows:v1",
       payload: new Uint8Array(),
-    };
-  }
-  isMerging() {
-    return false;
-  }
-  assignsToOneWindow() {
-    return true;
-  }
+    }),
+  };
 }
 
-export class FixedWindows implements WindowFn<IntervalWindow> {
-  size: Long;
-  offset: Instant; // TODO: (Cleanup) Or should this be a long as well?
-
+export function fixedWindows(
+  sizeSeconds: number | Long,
+  offsetSeconds: Instant = Long.fromValue(0)
+): WindowFn<IntervalWindow> {
   // TODO: (Cleanup) Use a time library?
-  constructor(
-    sizeSeconds: number | Long,
-    offsetSeconds: Instant = Long.fromValue(0)
-  ) {
-    if (typeof sizeSeconds == "number") {
-      this.size = Long.fromValue(sizeSeconds).mul(1000);
-    } else {
-      this.size = sizeSeconds.mul(1000);
-    }
-    this.offset = offsetSeconds.mul(1000);
-  }
+  const sizeMillis = secsToMillisLong(sizeSeconds);
+  const offsetMillis = secsToMillisLong(offsetSeconds);
 
-  assignWindows(t: Instant) {
-    const start = t.sub(t.sub(this.offset).mod(this.size));
-    return [new IntervalWindow(start, start.add(this.size))];
-  }
+  return {
+    assignWindows: (t: Instant) => {
+      const start = t.sub(t.sub(offsetMillis).mod(sizeMillis));
+      return [new IntervalWindow(start, start.add(sizeMillis))];
+    },
 
-  windowCoder() {
-    return new IntervalWindowCoder();
-  }
+    windowCoder: () => new IntervalWindowCoder(),
+    isMerging: () => false,
+    assignsToOneWindow: () => true,
 
-  toProto() {
-    return {
+    toProto: () => ({
       urn: "beam:window_fn:fixed_windows:v1",
       payload: FixedWindowsPayload.toBinary({
-        size: millisToProto(this.size),
-        offset: millisToProto(this.offset),
+        size: millisToProto(sizeMillis),
+        offset: millisToProto(offsetMillis),
       }),
-    };
-  }
-
-  isMerging() {
-    return false;
-  }
-
-  assignsToOneWindow() {
-    return true;
-  }
+    }),
+  };
 }
 
-export class Sessions implements WindowFn<IntervalWindow> {
-  gap: Long;
-
-  constructor(gapSeconds: number | Long) {
-    if (typeof gapSeconds == "number") {
-      this.gap = Long.fromValue(gapSeconds).mul(1000);
-    } else {
-      this.gap = gapSeconds.mul(1000);
-    }
-  }
+export function sessions(gapSeconds: number | Long): WindowFn<IntervalWindow> {
+  const gapMillis = secsToMillisLong(gapSeconds);
 
-  assignWindows(t: Instant) {
-    return [new IntervalWindow(t, t.add(this.gap))];
-  }
+  return {
+    assignWindows: (t: Instant) => [new IntervalWindow(t, t.add(gapMillis))],
+    windowCoder: () => new IntervalWindowCoder(),
+    isMerging: () => true,
+    assignsToOneWindow: () => true,
 
-  windowCoder() {
-    return new IntervalWindowCoder();
-  }
-
-  toProto() {
-    return {
+    toProto: () => ({
       urn: "beam:window_fn:session_windows:v1",
       payload: SessionWindowsPayload.toBinary({
-        gapSize: millisToProto(this.gap),
+        gapSize: millisToProto(gapMillis),
       }),
-    };
-  }
-
-  isMerging() {
-    return true;
-  }
+    }),
+  };
+}
 
-  assignsToOneWindow() {
-    return true;
+function secsToMillisLong(secs: number | Long): Long {
+  if (typeof secs == "number") {
+    return Long.fromValue(secs * 1000);
+  } else {
+    return secs.mul(1000);
   }
 }
 
@@ -139,3 +103,12 @@ function millisToProto(t: Long) {
 
 import { requireForSerialization } from "../serialization";
 requireForSerialization("apache_beam.transforms.windowings", exports);
+requireForSerialization("apache_beam.transforms.windowings", millisToProto);
+requireForSerialization(
+  "apache_beam.transforms.windowings",
+  FixedWindowsPayload
+);
+requireForSerialization(
+  "apache_beam.transforms.windowings",
+  SessionWindowsPayload
+);
diff --git a/sdks/typescript/src/apache_beam/worker/operators.ts b/sdks/typescript/src/apache_beam/worker/operators.ts
index f7bfa71038b..366e794c0d6 100644
--- a/sdks/typescript/src/apache_beam/worker/operators.ts
+++ b/sdks/typescript/src/apache_beam/worker/operators.ts
@@ -30,7 +30,7 @@ import { PipelineContext } from "../internal/pipeline";
 import { deserializeFn } from "../internal/serialize";
 import { Coder, Context as CoderContext } from "../coders/coders";
 import { Window, Instant, PaneInfo, WindowedValue } from "../values";
-import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { parDo, DoFn, ParDoParam, SplitOptions } from "../transforms/pardo";
 import { WindowFn } from "../transforms/window";
 
 import {
@@ -445,42 +445,37 @@ class IdentityParDoOperator implements IOperator {
 
 class SplittingDoFnOperator implements IOperator {
   constructor(
-    private splitter: (any) => string,
-    private receivers: { [key: string]: Receiver }
+    private receivers: { [key: string]: Receiver },
+    private options: SplitOptions
   ) {}
 
   async startBundle() {}
 
   process(wvalue: WindowedValue<unknown>) {
-    const tag = this.splitter(wvalue.value);
-    const receiver = this.receivers[tag];
-    if (receiver) {
-      return receiver.receive(wvalue);
-    } else {
-      // TODO: (API) Make this configurable.
+    const result = new ProcessResultBuilder();
+    const keys = Object.keys(wvalue.value as object);
+    if (this.options.exclusive && keys.length != 1) {
       throw new Error(
-        "Unexpected tag '" +
-          tag +
-          "' for " +
-          wvalue.value +
-          " not in " +
-          [...Object.keys(this.receivers)]
+        "Multiple keys for exclusively split element: " + wvalue.value
       );
     }
-  }
-
-  async finishBundle() {}
-}
-
-class Splitting2DoFnOperator implements IOperator {
-  constructor(private receivers: { [key: string]: Receiver }) {}
-
-  async startBundle() {}
-
-  process(wvalue: WindowedValue<unknown>) {
-    const result = new ProcessResultBuilder();
-    // TODO: (API) Should I exactly one instead of allowing a union?
-    for (const tag of Object.keys(wvalue.value as object)) {
+    for (let tag of keys) {
+      if (!this.options.knownTags!.includes(tag)) {
+        if (this.options.unknownTagBehavior == "rename") {
+          tag = this.options.unknownTagName!;
+        } else if (this.options.unknownTagBehavior == "ignore") {
+          continue;
+        } else {
+          throw new Error(
+            "Unexpected tag '" +
+              tag +
+              "' for " +
+              wvalue.value +
+              " not in " +
+              this.options.knownTags
+          );
+        }
+      }
       const receiver = this.receivers[tag];
       if (receiver) {
         result.add(
@@ -491,16 +486,6 @@ class Splitting2DoFnOperator implements IOperator {
             pane: wvalue.pane,
           })
         );
-      } else {
-        // TODO: (API) Make this configurable.
-        throw new Error(
-          "Unexpected tag '" +
-            tag +
-            "' for " +
-            wvalue.value +
-            " not in " +
-            [...Object.keys(this.receivers)]
-        );
       }
     }
     return result.build();
@@ -558,7 +543,7 @@ class AssignTimestampsParDoOperator implements IOperator {
 }
 
 registerOperatorConstructor(
-  ParDo.urn,
+  parDo.urn,
   (transformId: string, transform: PTransform, context: OperatorContext) => {
     const receiver = context.getReceiver(
       onlyElement(Object.values(transform.outputs))
@@ -590,22 +575,13 @@ registerOperatorConstructor(
       );
     } else if (spec.doFn?.urn == urns.SPLITTING_JS_DOFN_URN) {
       return new SplittingDoFnOperator(
-        deserializeFn(spec.doFn.payload!).splitter,
-        Object.fromEntries(
-          Object.entries(transform.outputs).map(([tag, pcId]) => [
-            tag,
-            context.getReceiver(pcId),
-          ])
-        )
-      );
-    } else if (spec.doFn?.urn == urns.SPLITTING2_JS_DOFN_URN) {
-      return new Splitting2DoFnOperator(
         Object.fromEntries(
           Object.entries(transform.outputs).map(([tag, pcId]) => [
             tag,
             context.getReceiver(pcId),
           ])
-        )
+        ),
+        deserializeFn(spec.doFn.payload!)
       );
     } else {
       throw new Error("Unknown DoFn type: " + spec);
diff --git a/sdks/typescript/test/combine_test.ts b/sdks/typescript/test/combine_test.ts
index c98e420ee17..f54f0a90e80 100644
--- a/sdks/typescript/test/combine_test.ts
+++ b/sdks/typescript/test/combine_test.ts
@@ -25,17 +25,17 @@ import { PortableRunner } from "../src/apache_beam/runners/portable_runner/runne
 import * as combiners from "../src/apache_beam/transforms/combiners";
 import {
   CombineFn,
-  GroupBy,
-  GroupGlobally,
-  CountPerElement,
-  CountGlobally,
+  groupBy,
+  groupGlobally,
+  countPerElement,
+  countGlobally,
 } from "../src/apache_beam/transforms/group_and_combine";
 
 describe("Apache Beam combiners", function () {
   it("runs wordcount with a countPerKey transform and asserts the result", async function () {
     await new DirectRunner().run((root) => {
       const lines = root.apply(
-        new beam.Create([
+        beam.create([
           "In the beginning God created the heaven and the earth.",
           "And the earth was without form, and void; and darkness was upon the face of the deep.",
           "And the Spirit of God moved upon the face of the waters.",
@@ -49,9 +49,9 @@ describe("Apache Beam combiners", function () {
           yield* line.split(/[^a-z]+/);
         })
         .map((elm) => ({ key: elm, value: 1 }))
-        .apply(new GroupBy("key").combining("value", combiners.sum, "value"))
+        .apply(groupBy("key").combining("value", combiners.sum, "value"))
         .apply(
-          new testing.AssertDeepEqual([
+          testing.assertDeepEqual([
             { key: "in", value: 1 },
             { key: "the", value: 9 },
             { key: "beginning", value: 1 },
@@ -86,9 +86,7 @@ describe("Apache Beam combiners", function () {
   it("runs wordcount with a countGlobally transform and asserts the result", async function () {
     await new DirectRunner().run((root) => {
       const lines = root.apply(
-        new beam.Create([
-          "And God said, Let there be light: and there was light",
-        ])
+        beam.create(["And God said, Let there be light: and there was light"])
       );
 
       lines
@@ -96,8 +94,8 @@ describe("Apache Beam combiners", function () {
         .flatMap(function* splitWords(line: string) {
           yield* line.split(/[^a-z]+/);
         })
-        .apply(new CountGlobally())
-        .apply(new testing.AssertDeepEqual([11]));
+        .apply(countGlobally())
+        .apply(testing.assertDeepEqual([11]));
     });
   });
 
@@ -138,7 +136,7 @@ describe("Apache Beam combiners", function () {
 
     await new DirectRunner().run((root) => {
       const lines = root.apply(
-        new beam.Create([
+        beam.create([
           "In the beginning God created the heaven and the earth.",
           "And the earth was without form, and void; and darkness was upon the face of the deep.",
           "And the Spirit of God moved upon the face of the waters.",
@@ -153,12 +151,12 @@ describe("Apache Beam combiners", function () {
         })
         .map((word) => word.length)
         .apply(
-          new GroupGlobally()
+          groupGlobally()
             .combining((c) => c, combiners.mean, "mean")
             .combining((c) => c, unstableStdDevCombineFn(), "stdDev")
         )
         .apply(
-          new testing.AssertDeepEqual([
+          testing.assertDeepEqual([
             { mean: 3.611111111111111, stdDev: 3.2746913580246897 },
           ])
         );
@@ -168,7 +166,7 @@ describe("Apache Beam combiners", function () {
   it("test GroupBy with combining", async function () {
     await new DirectRunner().run((root) => {
       const inputs = root.apply(
-        new beam.Create([
+        beam.create([
           { k: "k1", a: 1, b: 100 },
           { k: "k1", a: 2, b: 200 },
           { k: "k2", a: 9, b: 1000 },
@@ -177,13 +175,13 @@ describe("Apache Beam combiners", function () {
 
       inputs
         .apply(
-          new GroupBy("k")
+          groupBy("k")
             .combining("a", combiners.max, "aMax")
             .combining("a", combiners.sum, "aSum")
             .combining("b", combiners.mean, "mean")
         )
         .apply(
-          new testing.AssertDeepEqual([
+          testing.assertDeepEqual([
             { k: "k1", aMax: 2, aSum: 3, mean: 150 },
             { k: "k2", aMax: 9, aSum: 9, mean: 1000 },
           ])
@@ -194,7 +192,7 @@ describe("Apache Beam combiners", function () {
   it("test GroupBy list with combining", async function () {
     await new DirectRunner().run((root) => {
       const inputs = root.apply(
-        new beam.Create([
+        beam.create([
           { a: 1, b: 10, c: 100 },
           { a: 2, b: 10, c: 100 },
           { a: 1, b: 10, c: 400 },
@@ -202,18 +200,18 @@ describe("Apache Beam combiners", function () {
       );
 
       inputs
-        .apply(new GroupBy(["a", "b"]).combining("c", combiners.sum, "sum"))
+        .apply(groupBy(["a", "b"]).combining("c", combiners.sum, "sum"))
         .apply(
-          new testing.AssertDeepEqual([
+          testing.assertDeepEqual([
             { a: 1, b: 10, sum: 500 },
             { a: 2, b: 10, sum: 100 },
           ])
         );
 
       inputs
-        .apply(new GroupBy(["b", "c"]).combining("a", combiners.sum, "sum"))
+        .apply(groupBy(["b", "c"]).combining("a", combiners.sum, "sum"))
         .apply(
-          new testing.AssertDeepEqual([
+          testing.assertDeepEqual([
             { b: 10, c: 100, sum: 3 },
             { b: 10, c: 400, sum: 1 },
           ])
@@ -224,7 +222,7 @@ describe("Apache Beam combiners", function () {
   it("test GroupBy expr with combining", async function () {
     await new DirectRunner().run((root) => {
       const inputs = root.apply(
-        new beam.Create([
+        beam.create([
           { a: 1, b: 10 },
           { a: 0, b: 20 },
           { a: -1, b: 30 },
@@ -233,14 +231,14 @@ describe("Apache Beam combiners", function () {
 
       inputs
         .apply(
-          new GroupBy((element: any) => element.a * element.a).combining(
+          groupBy((element: any) => element.a * element.a).combining(
             "b",
             combiners.sum,
             "sum"
           )
         )
         .apply(
-          new testing.AssertDeepEqual([
+          testing.assertDeepEqual([
             { key: 1, sum: 40 },
             { key: 0, sum: 20 },
           ])
@@ -251,7 +249,7 @@ describe("Apache Beam combiners", function () {
   it("test GroupBy with binary combinefn", async function () {
     await new DirectRunner().run((root) => {
       const inputs = root.apply(
-        new beam.Create([
+        beam.create([
           { key: 0, value: 10 },
           { key: 1, value: 20 },
           { key: 0, value: 30 },
@@ -260,12 +258,12 @@ describe("Apache Beam combiners", function () {
 
       inputs
         .apply(
-          new GroupBy("key")
+          groupBy("key")
             .combining("value", (x, y) => x + y, "sum")
             .combining("value", (x, y) => Math.max(x, y), "max")
         )
         .apply(
-          new testing.AssertDeepEqual([
+          testing.assertDeepEqual([
             { key: 0, sum: 40, max: 30 },
             { key: 1, sum: 20, max: 20 },
           ])
diff --git a/sdks/typescript/test/primitives_test.ts b/sdks/typescript/test/primitives_test.ts
index 0840ca7c93c..c95c2a2633e 100644
--- a/sdks/typescript/test/primitives_test.ts
+++ b/sdks/typescript/test/primitives_test.ts
@@ -25,10 +25,6 @@ import {
   IterableCoder,
   KVCoder,
 } from "../src/apache_beam/coders/standard_coders";
-import {
-  GroupBy,
-  GroupGlobally,
-} from "../src/apache_beam/transforms/group_and_combine";
 import * as combiners from "../src/apache_beam/transforms/combiners";
 import { GeneralObjectCoder } from "../src/apache_beam/coders/js_coders";
 
@@ -45,84 +41,74 @@ describe("primitives module", function () {
     it("runs a map", async function () {
       await new DirectRunner().run((root) => {
         const pcolls = root
-          .apply(new beam.Create([1, 2, 3]))
+          .apply(beam.create([1, 2, 3]))
           .map((x) => x * x)
-          .apply(new testing.AssertDeepEqual([1, 4, 9]));
+          .apply(testing.assertDeepEqual([1, 4, 9]));
       });
     });
 
     it("runs a flatmap", async function () {
       await new DirectRunner().run((root) => {
         const pcolls = root
-          .apply(new beam.Create(["a b", "c"]))
+          .apply(beam.create(["a b", "c"]))
           .flatMap((s) => s.split(/ +/))
-          .apply(new testing.AssertDeepEqual(["a", "b", "c"]));
+          .apply(testing.assertDeepEqual(["a", "b", "c"]));
       });
     });
 
     it("runs a Splitter", async function () {
       await new DirectRunner().run((root) => {
         const pcolls = root
-          .apply(new beam.Create(["apple", "apricot", "banana"]))
-          .apply(new beam.Split((e) => e[0], "a", "b"));
-        pcolls.a.apply(new testing.AssertDeepEqual(["apple", "apricot"]));
-        pcolls.b.apply(new testing.AssertDeepEqual(["banana"]));
-      });
-    });
-
-    it("runs a Splitter2", async function () {
-      await new DirectRunner().run((root) => {
-        const pcolls = root
-          .apply(new beam.Create([{ a: 1 }, { b: 10 }, { a: 2, b: 20 }]))
-          .apply(new beam.Split2("a", "b"));
-        pcolls.a.apply(new testing.AssertDeepEqual([1, 2]));
-        pcolls.b.apply(new testing.AssertDeepEqual([10, 20]));
+          .apply(beam.create([{ a: 1 }, { b: 10 }, { a: 2, b: 20 }]))
+          .apply(beam.split(["a", "b"], { exclusive: false }));
+        pcolls.a.apply(testing.assertDeepEqual([1, 2]));
+        pcolls.b.apply(testing.assertDeepEqual([10, 20]));
       });
     });
 
     it("runs a map with context", async function () {
       await new DirectRunner().run((root) => {
         root
-          .apply(new beam.Create([1, 2, 3]))
+          .apply(beam.create([1, 2, 3]))
           .map((a: number, b: number) => a + b, 100)
-          .apply(new testing.AssertDeepEqual([101, 102, 103]));
+          .apply(testing.assertDeepEqual([101, 102, 103]));
       });
     });
 
     it("runs a map with singleton side input", async function () {
       await new DirectRunner().run((root) => {
-        const input = root.apply(new beam.Create([1, 2, 1]));
-        const sideInput = root.apply(new beam.Create([4]));
+        const input = root.apply(beam.create([1, 2, 1]));
+        const sideInput = root.apply(beam.create([4]));
         input
           .map((e, context) => e / context.side.lookup(), {
-            side: new pardo.SingletonSideInput(sideInput),
+            side: pardo.singletonSideInput(sideInput),
           })
-          .apply(new testing.AssertDeepEqual([0.25, 0.5, 0.25]));
+          .apply(testing.assertDeepEqual([0.25, 0.5, 0.25]));
       });
     });
 
     it("runs a map with a side input sharing input root", async function () {
       await new DirectRunner().run((root) => {
-        const input = root.apply(new beam.Create([1, 2, 1]));
+        const input = root.apply(beam.create([1, 2, 1]));
         // TODO: Can this type be inferred?
         const sideInput: beam.PCollection<{ sum: number }> = input.apply(
-          new GroupGlobally().combining((e) => e, combiners.sum, "sum")
+          beam.groupGlobally().combining((e) => e, combiners.sum, "sum")
         );
         input
           .map((e, context) => e / context.side.lookup().sum, {
-            side: new pardo.SingletonSideInput(sideInput),
+            side: pardo.singletonSideInput(sideInput),
           })
-          .apply(new testing.AssertDeepEqual([0.25, 0.5, 0.25]));
+          .apply(testing.assertDeepEqual([0.25, 0.5, 0.25]));
       });
     });
 
     it("runs a map with window-sensitive context", async function () {
       await new DirectRunner().run((root) => {
         root
-          .apply(new beam.Create([1, 2, 3, 4, 5, 10, 11, 12]))
-          .apply(new beam.AssignTimestamps((t) => Long.fromValue(t * 1000)))
-          .apply(new beam.WindowInto(new windowings.FixedWindows(10)))
-          .apply(new beam.GroupBy((e: number) => ""))
+          .apply(beam.create([1, 2, 3, 4, 5, 10, 11, 12]))
+          .apply(beam.assignTimestamps((t) => Long.fromValue(t * 1000)))
+          .apply(beam.windowInto(windowings.fixedWindows(10)))
+          .apply(beam.groupBy((e: number) => ""))
           .map(
             withName(
               "MapWithContext",
@@ -138,10 +124,10 @@ describe("primitives module", function () {
             ),
             // This is the context to pass as the second argument.
             // At each element, window.get() will return the associated window.
-            { window: new pardo.WindowParam(), other: "A" }
+            { window: pardo.windowParam(), other: "A" }
           )
           .apply(
-            new testing.AssertDeepEqual([
+            testing.assertDeepEqual([
               { key: "", value: [1, 2, 3, 4, 5], window_start_ms: 0, a: "A" },
               { key: "", value: [10, 11, 12], window_start_ms: 10000, a: "A" },
             ])
@@ -152,11 +138,11 @@ describe("primitives module", function () {
     it("runs a WindowInto", async function () {
       await new DirectRunner().run((root) => {
         root
-          .apply(new beam.Create(["apple", "apricot", "banana"]))
-          .apply(new beam.WindowInto(new windowings.GlobalWindows()))
-          .apply(new beam.GroupBy((e: string) => e[0]))
+          .apply(beam.create(["apple", "apricot", "banana"]))
+          .apply(beam.windowInto(windowings.globalWindows()))
+          .apply(beam.groupBy((e: string) => e[0]))
           .apply(
-            new testing.AssertDeepEqual([
+            testing.assertDeepEqual([
               { key: "a", value: ["apple", "apricot"] },
               { key: "b", value: ["banana"] },
             ])
@@ -167,12 +153,12 @@ describe("primitives module", function () {
     it("runs a WindowInto IntervalWindow", async function () {
       await new DirectRunner().run((root) => {
         root
-          .apply(new beam.Create([1, 2, 3, 4, 5, 10, 11, 12]))
-          .apply(new beam.AssignTimestamps((t) => Long.fromValue(t * 1000)))
-          .apply(new beam.WindowInto(new windowings.FixedWindows(10)))
-          .apply(new beam.GroupBy((e: number) => ""))
+          .apply(beam.create([1, 2, 3, 4, 5, 10, 11, 12]))
+          .apply(beam.assignTimestamps((t) => Long.fromValue(t * 1000)))
+          .apply(beam.windowInto(windowings.fixedWindows(10)))
+          .apply(beam.groupBy((e: number) => ""))
           .apply(
-            new testing.AssertDeepEqual([
+            testing.assertDeepEqual([
               { key: "", value: [1, 2, 3, 4, 5] },
               { key: "", value: [10, 11, 12] },
             ])
@@ -185,7 +171,7 @@ describe("primitives module", function () {
     // TODO: test output with direct runner.
     it("runs a basic Impulse expansion", function () {
       var p = new Pipeline();
-      var res = new beam.Root(p).apply(new beam.Impulse());
+      var res = new beam.Root(p).apply(beam.impulse());
 
       assert.equal(res.type, "pcollection");
       assert.deepEqual(p.context.getPCollectionCoder(res), new BytesCoder());
@@ -193,7 +179,7 @@ describe("primitives module", function () {
     it("runs a ParDo expansion", function () {
       var p = new Pipeline();
       var res = new beam.Root(p)
-        .apply(new beam.Impulse())
+        .apply(beam.impulse())
         .map(function (v: any) {
           return v * 2;
         })
@@ -211,11 +197,11 @@ describe("primitives module", function () {
     it("runs a GroupBy expansion", function () {
       var p = new Pipeline();
       var res = new beam.Root(p)
-        .apply(new beam.Impulse())
+        .apply(beam.impulse())
         .map(function createElement(v) {
           return { name: "pablo", lastName: "wat" };
         })
-        .apply(new GroupBy("lastName"));
+        .apply(beam.groupBy("lastName"));
 
       assert.deepEqual(
         p.context.getPCollectionCoder(res),
diff --git a/sdks/typescript/test/standard_coders_test.ts b/sdks/typescript/test/standard_coders_test.ts
index dd80ce12d66..3d34199fccb 100644
--- a/sdks/typescript/test/standard_coders_test.ts
+++ b/sdks/typescript/test/standard_coders_test.ts
@@ -137,29 +137,23 @@ describe("standard Beam coders on Javascript", function () {
     contexts.forEach((context) => {
       describe("in Context " + context, function () {
         const spec = doc;
-
-        const coderConstructor = globalRegistry().get(urn);
-        var coder;
+        var components;
         if (spec.coder.components) {
-          var components;
-          try {
-            components = spec.coder.components.map(
-              (c) => new (globalRegistry().get(c.urn))()
-            );
-          } catch (Error) {
-            return;
-          }
-          coder = new coderConstructor(...components);
+          components = spec.coder.components.map(
+            // Second level coders have neither payloads nor components.
+            (c) => globalRegistry().getCoder(c.urn)
+          );
         } else {
-          coder = new coderConstructor();
+          components = [];
         }
-        describeCoder(coder, urn, context, spec);
+        const coder = globalRegistry().getCoder(urn, undefined, ...components);
+        runCoderTest(coder, urn, context, spec);
       });
     });
   });
 });
 
-function describeCoder<T>(coder: Coder<T>, urn, context, spec: CoderSpec) {
+function runCoderTest<T>(coder: Coder<T>, urn, context, spec: CoderSpec) {
   describe(
     util.format(
       "coder %s (%s)",
diff --git a/sdks/typescript/test/wordcount.ts b/sdks/typescript/test/wordcount.ts
index f13655616e3..b7267ef5589 100644
--- a/sdks/typescript/test/wordcount.ts
+++ b/sdks/typescript/test/wordcount.ts
@@ -19,9 +19,6 @@
 import * as beam from "../src/apache_beam";
 import { DirectRunner } from "../src/apache_beam/runners/direct_runner";
 import * as testing from "../src/apache_beam/testing/assert";
-import { KV } from "../src/apache_beam/values";
-import { GroupBy } from "../src/apache_beam/transforms/group_and_combine";
-import * as combiners from "../src/apache_beam/transforms/combiners";
 
 import { PortableRunner } from "../src/apache_beam/runners/portable_runner/runner";
 
@@ -33,29 +30,14 @@ function wordCount(
     .flatMap(function* splitWords(line: string) {
       yield* line.split(/[^a-z]+/);
     })
-    .apply(new CountElements("Count"));
-}
-
-class CountElements extends beam.PTransform<
-  beam.PCollection<any>,
-  beam.PCollection<KV<any, number>>
-> {
-  expand(input: beam.PCollection<any>) {
-    return input.apply(
-      new GroupBy((e) => e, "element").combining(
-        (e) => 1,
-        combiners.sum,
-        "count"
-      )
-    );
-  }
+    .apply(beam.countPerElement());
 }
 
 describe("wordcount", function () {
   it("wordcount", async function () {
     await new DirectRunner().run((root) => {
       const lines = root.apply(
-        new beam.Create([
+        beam.create([
           "In the beginning God created the heaven and the earth.",
           "And the earth was without form, and void; and darkness was upon the face of the deep.",
           "And the Spirit of God moved upon the face of the waters.",
@@ -70,13 +52,11 @@ describe("wordcount", function () {
   it("wordcount assert", async function () {
     await new DirectRunner().run((root) => {
       const lines = root.apply(
-        new beam.Create([
-          "And God said, Let there be light: and there was light",
-        ])
+        beam.create(["And God said, Let there be light: and there was light"])
       );
 
       lines.apply(wordCount).apply(
-        new testing.AssertDeepEqual([
+        testing.assertDeepEqual([
           { element: "and", count: 2 },
           { element: "god", count: 1 },
           { element: "said", count: 1 },