You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/05/13 22:00:52 UTC

[beam] branch master updated: Add some auto-starting runners to the typescript SDK. (#17580)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 66e85dab799 Add some auto-starting runners to the typescript SDK. (#17580)
66e85dab799 is described below

commit 66e85dab799963368b9d210ac8b1d1d5635efcda
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Fri May 13 15:00:42 2022 -0700

    Add some auto-starting runners to the typescript SDK. (#17580)
    
    Adds out-of-the-box support for FlinkRunner, DataflowRunner, and the Python Universal Local runner. Also adds a DefaultRunner which chooses between the DirectRunner and the ULR depending on the properties of the pipeline.
---
 .../sdk/extensions/python/bootstrap_beam_venv.py   |   2 +-
 sdks/typescript/package-lock.json                  |  15 ++-
 sdks/typescript/package.json                       |   1 +
 .../src/apache_beam/examples/wordcount.ts          |  20 +++-
 .../typescript/src/apache_beam/runners/dataflow.ts |  44 +++++++++
 .../src/apache_beam/runners/direct_runner.ts       |  47 +++++++++
 sdks/typescript/src/apache_beam/runners/flink.ts   |  84 ++++++++++++++++
 .../apache_beam/runners/portable_runner/runner.ts  |  85 ++++++++++------
 sdks/typescript/src/apache_beam/runners/runner.ts  |  45 ++++++++-
 .../src/apache_beam/runners/universal.ts           |  41 ++++++++
 .../src/apache_beam/transforms/internal.ts         |   2 +
 sdks/typescript/src/apache_beam/utils/service.ts   | 108 +++++++++++++++++----
 sdks/typescript/src/apache_beam/worker/data.ts     |   3 +
 .../apache_beam/worker/external_worker_service.ts  |  47 +++++----
 sdks/typescript/src/apache_beam/worker/worker.ts   |  12 ++-
 15 files changed, 479 insertions(+), 77 deletions(-)

diff --git a/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py b/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py
index 08120b84adb..c113676a8f7 100644
--- a/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py
+++ b/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py
@@ -77,7 +77,7 @@ def main():
           or options.beam_version.startswith('https://')):
         # It's a path to a tarball.
         beam_version = os.path.basename(options.beam_version)
-        beam_package = options.beam_version
+        beam_package = options.beam_version + '[gcp,aws,asure,dataframe]'
     else:
         beam_version = options.beam_version
         beam_package = 'apache_beam[gcp,aws,asure,dataframe]==' + beam_version
diff --git a/sdks/typescript/package-lock.json b/sdks/typescript/package-lock.json
index 49e62618f88..c39002e6001 100644
--- a/sdks/typescript/package-lock.json
+++ b/sdks/typescript/package-lock.json
@@ -1,12 +1,12 @@
 {
   "name": "apache_beam",
-  "version": "0.37.0.dev",
+  "version": "0.38.0",
   "lockfileVersion": 2,
   "requires": true,
   "packages": {
     "": {
       "name": "apache_beam",
-      "version": "0.37.0.dev",
+      "version": "0.38.0",
       "dependencies": {
         "@grpc/grpc-js": "^1.4.6",
         "@protobuf-ts/grpc-transport": "^2.1.0",
@@ -16,6 +16,7 @@
         "chai": "^4.3.4",
         "date-fns": "^2.28.0",
         "fast-deep-equal": "^3.1.3",
+        "find-git-root": "^1.0.4",
         "long": "^4.0.0",
         "protobufjs": "^6.10.2",
         "queue-typescript": "^1.0.1",
@@ -919,6 +920,11 @@
         "node": ">=8"
       }
     },
+    "node_modules/find-git-root": {
+      "version": "1.0.4",
+      "resolved": "https://registry.npmjs.org/find-git-root/-/find-git-root-1.0.4.tgz",
+      "integrity": "sha512-468fmirKKgcrqfZfPn0xIpwZUUsZQcYXfx0RC2/jX39GPz83TwutQNZZhDrI6HqjO8cRejxQVaUY8GQdXopFfA=="
+    },
     "node_modules/find-up": {
       "version": "5.0.0",
       "resolved": "https://registry.npmjs.org/find-up/-/find-up-5.0.0.tgz",
@@ -2942,6 +2948,11 @@
         "to-regex-range": "^5.0.1"
       }
     },
+    "find-git-root": {
+      "version": "1.0.4",
+      "resolved": "https://registry.npmjs.org/find-git-root/-/find-git-root-1.0.4.tgz",
+      "integrity": "sha512-468fmirKKgcrqfZfPn0xIpwZUUsZQcYXfx0RC2/jX39GPz83TwutQNZZhDrI6HqjO8cRejxQVaUY8GQdXopFfA=="
+    },
     "find-up": {
       "version": "5.0.0",
       "resolved": "https://registry.npmjs.org/find-up/-/find-up-5.0.0.tgz",
diff --git a/sdks/typescript/package.json b/sdks/typescript/package.json
index 1e13de36bfc..6ab659ccf2e 100644
--- a/sdks/typescript/package.json
+++ b/sdks/typescript/package.json
@@ -32,6 +32,7 @@
     "chai": "^4.3.4",
     "date-fns": "^2.28.0",
     "fast-deep-equal": "^3.1.3",
+    "find-git-root": "^1.0.4",
     "long": "^4.0.0",
     "protobufjs": "^6.10.2",
     "queue-typescript": "^1.0.1",
diff --git a/sdks/typescript/src/apache_beam/examples/wordcount.ts b/sdks/typescript/src/apache_beam/examples/wordcount.ts
index d68d0f25600..961afb43e9b 100644
--- a/sdks/typescript/src/apache_beam/examples/wordcount.ts
+++ b/sdks/typescript/src/apache_beam/examples/wordcount.ts
@@ -16,10 +16,24 @@
  * limitations under the License.
  */
 
-// TODO: Should this be in a top-level examples dir, rather than under apache_beam.
+// Run directly with
+//
+//    node dist/src/apache_beam/examples/wordcount.js
+//
+// A different runner can be chosen via a --runner argument, e.g.
+//
+//    node dist/src/apache_beam/examples/wordcount.js --runner=flink
+//
+// To run on Dataflow, pass the required arguments:
+//
+//    node dist/src/apache_beam/examples/wordcount.js --runner=dataflow --project=PROJECT_ID --tempLocation=gs://BUCKET/DIR' --region=us-central1
+
+// TODO: Should this be in a top-level examples dir, rather than under apache_beam?
+
+import * as yargs from "yargs";
 
 import * as beam from "../../apache_beam";
-import { DirectRunner } from "../runners/direct_runner";
+import { createRunner } from "../runners/runner";
 
 import { count } from "../transforms/combiners";
 import { GroupBy } from "../transforms/group_and_combine";
@@ -45,7 +59,7 @@ function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
 }
 
 async function main() {
-  await new DirectRunner().run((root) => {
+  await createRunner(yargs.argv).run((root) => {
     const lines = root.apply(
       new beam.Create([
         "In the beginning God created the heaven and the earth.",
diff --git a/sdks/typescript/src/apache_beam/runners/dataflow.ts b/sdks/typescript/src/apache_beam/runners/dataflow.ts
new file mode 100644
index 00000000000..958eb99c956
--- /dev/null
+++ b/sdks/typescript/src/apache_beam/runners/dataflow.ts
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Pipeline } from "../internal/pipeline";
+import { PipelineResult, Runner } from "./runner";
+import { PortableRunner } from "./portable_runner/runner";
+import { PythonService } from "../utils/service";
+
+export function dataflowRunner(runnerOptions: {
+  project: string;
+  tempLocation: string;
+  region: string;
+  [others: string]: any;
+}): Runner {
+  return new (class extends Runner {
+    async runPipeline(
+      pipeline: Pipeline,
+      options: Object = {}
+    ): Promise<PipelineResult> {
+      return new PortableRunner(
+        runnerOptions as any,
+        new PythonService("apache_beam.runners.dataflow.dataflow_job_service", [
+          "--port",
+          "{{PORT}}",
+        ])
+      ).runPipeline(pipeline, options);
+    }
+  })();
+}
diff --git a/sdks/typescript/src/apache_beam/runners/direct_runner.ts b/sdks/typescript/src/apache_beam/runners/direct_runner.ts
index a1a10621af9..ff203d06c41 100644
--- a/sdks/typescript/src/apache_beam/runners/direct_runner.ts
+++ b/sdks/typescript/src/apache_beam/runners/direct_runner.ts
@@ -44,14 +44,61 @@ import {
 } from "../values";
 import { PaneInfoCoder } from "../coders/standard_coders";
 import { Coder, Context as CoderContext } from "../coders/coders";
+import * as environments from "../internal/environments";
 import { serializeFn, deserializeFn } from "../internal/serialize";
 
+const SUPPORTED_REQUIREMENTS: string[] = [];
+
+export function directRunner(options: Object = {}): Runner {
+  return new DirectRunner(options);
+}
+
 export class DirectRunner extends Runner {
   // All the operators for a given pipeline should share the same state.
   // This global mapping allows operators to look up a shared state object for
   // a given pipeline on deserialization.
   static inMemoryStatesRefs: Map<string, InMemoryStateProvider> = new Map();
 
+  constructor(private options: Object = {}) {
+    super();
+  }
+
+  unsupportedFeatures(pipeline, options: Object = {}): string[] {
+    return [...this.unsupportedFeaturesIter(pipeline, options)];
+  }
+
+  *unsupportedFeaturesIter(pipeline, options: Object = {}) {
+    const proto: runnerApi.Pipeline = pipeline.proto;
+    for (const requirement of proto.requirements) {
+      if (!SUPPORTED_REQUIREMENTS.includes(requirement)) {
+        yield requirement;
+      }
+    }
+
+    for (const env of Object.values(proto.components!.environments)) {
+      if (
+        env.urn &&
+        env.urn != environments.TYPESCRIPT_DEFAULT_ENVIRONMENT_URN
+      ) {
+        yield env.urn;
+      }
+    }
+
+    for (const windowing of Object.values(
+      proto.components!.windowingStrategies
+    )) {
+      if (
+        ![
+          runnerApi.MergeStatus_Enum.UNSPECIFIED,
+          runnerApi.MergeStatus_Enum.NON_MERGING,
+          runnerApi.MergeStatus_Enum.ALREADY_MERGED,
+        ].includes(windowing.mergeStatus)
+      ) {
+        yield "MergeStatus=" + windowing.mergeStatus;
+      }
+    }
+  }
+
   async runPipeline(p): Promise<PipelineResult> {
     // console.dir(p.proto, { depth: null });
 
diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts
new file mode 100644
index 00000000000..4acb68e642f
--- /dev/null
+++ b/sdks/typescript/src/apache_beam/runners/flink.ts
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const os = require("os");
+const path = require("path");
+
+import { Pipeline } from "../internal/pipeline";
+import { PipelineResult, Runner } from "./runner";
+import { PortableRunner } from "./portable_runner/runner";
+import { JavaJarService } from "../utils/service";
+
+const MAGIC_HOST_NAMES = ["[local]", "[auto]"];
+
+// These should stay in sync with gradle.properties.
+const PUBLISHED_FLINK_VERSIONS = ["1.12", "1.13", "1.14"];
+
+const defaultOptions = {
+  flinkMaster: "[local]",
+  flinkVersion: PUBLISHED_FLINK_VERSIONS[PUBLISHED_FLINK_VERSIONS.length - 1],
+};
+
+export function flinkRunner(runnerOptions: Object = {}): Runner {
+  return new (class extends Runner {
+    async runPipeline(
+      pipeline: Pipeline,
+      options: Object = {}
+    ): Promise<PipelineResult> {
+      const allOptions = {
+        ...defaultOptions,
+        ...runnerOptions,
+        ...options,
+      } as any;
+      if (
+        !allOptions.environmentType &&
+        MAGIC_HOST_NAMES.includes(allOptions.flinkMaster)
+      ) {
+        allOptions.environmentType = "LOOPBACK";
+      }
+      if (!allOptions.artifactsDir) {
+        allOptions.artifactsDir = fs.mkdtempSync(
+          path.join(os.tmpdir(), "flinkArtifactsDir")
+        );
+      }
+
+      const jobServerJar =
+        allOptions.flinkJobServerJar ||
+        (await JavaJarService.cachedJar(
+          JavaJarService.gradleToJar(
+            `runners:flink:${allOptions.flinkVersion}:job-server:shadowJar`
+          )
+        ));
+      const jobServer = new JavaJarService(jobServerJar, [
+        "--flink-master",
+        allOptions.flinkMaster,
+        "--artifacts-dir",
+        allOptions.artifactsDir,
+        "--job-port",
+        "{{PORT}}",
+        "--artifact-port",
+        "0",
+        "--expansion-port",
+        "0",
+      ]);
+
+      return new PortableRunner(allOptions, jobServer).runPipeline(pipeline);
+    }
+  })();
+}
diff --git a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts
index 3081249539f..f5281aa969d 100644
--- a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts
+++ b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts
@@ -28,11 +28,12 @@ import { ArtifactStagingServiceClient } from "../../proto/beam_artifact_api.clie
 import { Pipeline } from "../../internal/pipeline";
 import { PipelineResult, Runner } from "../runner";
 import { PipelineOptions } from "../../options/pipeline_options";
-import { JobState_Enum } from "../../proto/beam_job_api";
+import { JobState_Enum, JobStateEvent } from "../../proto/beam_job_api";
 
 import { ExternalWorkerPool } from "../../worker/external_worker_service";
 import * as environments from "../../internal/environments";
 import * as artifacts from "../artifacts";
+import { Service as JobService } from "../../utils/service";
 
 const TERMINAL_STATES = [
   JobState_Enum.DONE,
@@ -42,19 +43,22 @@ const TERMINAL_STATES = [
   JobState_Enum.DRAINED,
 ];
 
+type completionCallback = (terminalState: JobStateEvent) => Promise<unknown>;
+
 class PortableRunnerPipelineResult implements PipelineResult {
   jobId: string;
   runner: PortableRunner;
-  workers?: ExternalWorkerPool;
+  completionCallbacks: completionCallback[];
+  terminalState?: JobStateEvent;
 
   constructor(
     runner: PortableRunner,
     jobId: string,
-    workers: ExternalWorkerPool | undefined = undefined
+    completionCallbacks: completionCallback[]
   ) {
     this.runner = runner;
     this.jobId = jobId;
-    this.workers = workers;
+    this.completionCallbacks = completionCallbacks;
   }
 
   static isTerminal(state: JobState_Enum) {
@@ -62,13 +66,15 @@ class PortableRunnerPipelineResult implements PipelineResult {
   }
 
   async getState() {
+    if (this.terminalState) {
+      return this.terminalState;
+    }
     const state = await this.runner.getJobState(this.jobId);
-    if (
-      this.workers != undefined &&
-      PortableRunnerPipelineResult.isTerminal(state.state)
-    ) {
-      this.workers.stop();
-      this.workers = undefined;
+    if (PortableRunnerPipelineResult.isTerminal(state.state)) {
+      this.terminalState = state;
+      for (const callback of this.completionCallbacks) {
+        await callback(state);
+      }
     }
     return state;
   }
@@ -96,11 +102,12 @@ class PortableRunnerPipelineResult implements PipelineResult {
 }
 
 export class PortableRunner extends Runner {
-  client: JobServiceClient;
+  client?: JobServiceClient;
   defaultOptions: any;
 
   constructor(
-    options: string | { jobEndpoint: string; [others: string]: any }
+    options: string | { jobEndpoint: string; [others: string]: any },
+    private jobService: JobService | undefined = undefined
   ) {
     super();
     if (typeof options == "string") {
@@ -108,16 +115,25 @@ export class PortableRunner extends Runner {
     } else if (options) {
       this.defaultOptions = options;
     }
-    this.client = new JobServiceClient(
-      new GrpcTransport({
-        host: this.defaultOptions?.jobEndpoint,
-        channelCredentials: ChannelCredentials.createInsecure(),
-      })
-    );
+  }
+
+  async getClient(): Promise<JobServiceClient> {
+    if (!this.client) {
+      if (this.jobService) {
+        this.defaultOptions.jobEndpoint = await this.jobService.start();
+      }
+      this.client = new JobServiceClient(
+        new GrpcTransport({
+          host: this.defaultOptions?.jobEndpoint,
+          channelCredentials: ChannelCredentials.createInsecure(),
+        })
+      );
+    }
+    return this.client;
   }
 
   async getJobState(jobId: string) {
-    const call = this.client.getState({ jobId });
+    const call = (await this.getClient()).getState({ jobId });
     return await call.response;
   }
 
@@ -138,11 +154,18 @@ export class PortableRunner extends Runner {
       options = { ...this.defaultOptions, ...options };
     }
 
-    const use_loopback_service =
-      (options as any)?.environmentType == "LOOPBACK";
-    const workers = use_loopback_service ? new ExternalWorkerPool() : undefined;
-    if (use_loopback_service) {
-      workers!.start();
+    const completionCallbacks: completionCallback[] = [];
+
+    if (this.jobService) {
+      const jobService = this.jobService;
+      completionCallbacks.push(() => jobService.stop());
+    }
+
+    let loopbackAddress: string | undefined = undefined;
+    if ((options as any)?.environmentType == "LOOPBACK") {
+      const workers = new ExternalWorkerPool();
+      loopbackAddress = await workers.start();
+      completionCallbacks.push(() => workers.stop());
     }
 
     // Replace the default environment according to the pipeline options.
@@ -151,9 +174,9 @@ export class PortableRunner extends Runner {
       pipeline.components!.environments
     )) {
       if (env.urn == environments.TYPESCRIPT_DEFAULT_ENVIRONMENT_URN) {
-        if (use_loopback_service) {
+        if (loopbackAddress) {
           pipeline.components!.environments[envId] =
-            environments.asExternalEnvironment(env, workers!.address);
+            environments.asExternalEnvironment(env, loopbackAddress);
         } else {
           pipeline.components!.environments[envId] =
             environments.asDockerEnvironment(
@@ -166,6 +189,7 @@ export class PortableRunner extends Runner {
     }
 
     // Inform the runner that we'd like to execute this pipeline.
+    console.debug("Preparing job.");
     let message: PrepareJobRequest = {
       pipeline,
       jobName: (options as any)?.jobName || "",
@@ -182,10 +206,12 @@ export class PortableRunner extends Runner {
         )
       );
     }
-    const prepareResponse = await this.client.prepare(message).response;
+    const client = await this.getClient();
+    const prepareResponse = await client.prepare(message).response;
 
     // Allow the runner to fetch any artifacts it can't interpret.
     if (prepareResponse.artifactStagingEndpoint) {
+      console.debug("Staging artifacts");
       await artifacts.offerArtifacts(
         new ArtifactStagingServiceClient(
           new GrpcTransport({
@@ -198,7 +224,8 @@ export class PortableRunner extends Runner {
     }
 
     // Actually kick off the job.
-    const runCall = this.client.run({
+    console.debug("Running job.");
+    const runCall = client.run({
       preparationId: prepareResponse.preparationId,
       retrievalToken: "",
     });
@@ -208,6 +235,6 @@ export class PortableRunner extends Runner {
     // If desired, the user can use this handle to await job completion, but
     // this function returns as soon as the job is successfully started, not
     // once the job has completed.
-    return new PortableRunnerPipelineResult(this, jobId, workers);
+    return new PortableRunnerPipelineResult(this, jobId, completionCallbacks);
   }
 }
diff --git a/sdks/typescript/src/apache_beam/runners/runner.ts b/sdks/typescript/src/apache_beam/runners/runner.ts
index 272ef840783..7fc2ad79477 100644
--- a/sdks/typescript/src/apache_beam/runners/runner.ts
+++ b/sdks/typescript/src/apache_beam/runners/runner.ts
@@ -25,11 +25,29 @@ export interface PipelineResult {
   waitUntilFinish(duration?: number): Promise<JobState_Enum>;
 }
 
+export function createRunner(options): Runner {
+  let runnerConstructor: (any) => Runner;
+  if (options.runner == undefined || options.runner == "default") {
+    runnerConstructor = defaultRunner;
+  } else if (options.runner == "direct") {
+    runnerConstructor = require("./direct_runner").directRunner;
+  } else if (options.runner == "universal") {
+    runnerConstructor = require("./universal").universalRunner;
+  } else if (options.runner == "flink") {
+    runnerConstructor = require("./flink").flinkRunner;
+  } else if (options.runner == "dataflow") {
+    runnerConstructor = require("./dataflow").dataflowRunner;
+  } else {
+    throw new Error("Unknown runner: " + options.runner);
+  }
+  return runnerConstructor(options);
+}
+
 /**
  * A Runner is the object that takes a pipeline definition and actually
  * executes, e.g. locally or on a distributed system.
  */
-export class Runner {
+export abstract class Runner {
   /**
    * Runs the transform.
    *
@@ -64,10 +82,27 @@ export class Runner {
     return this.runPipeline(p);
   }
 
-  protected async runPipeline(
+  abstract runPipeline(
     pipeline: Pipeline,
     options?: PipelineOptions
-  ): Promise<PipelineResult> {
-    throw new Error("Not implemented.");
-  }
+  ): Promise<PipelineResult>;
+}
+
+export function defaultRunner(defaultOptions: Object): Runner {
+  return new (class extends Runner {
+    async runPipeline(
+      pipeline: Pipeline,
+      options: Object = {}
+    ): Promise<PipelineResult> {
+      const directRunner =
+        require("./direct_runner").directRunner(defaultOptions);
+      if (directRunner.unsupportedFeatures(pipeline, options).length == 0) {
+        return directRunner.runPipeline(pipeline, options);
+      } else {
+        return require("./universal")
+          .universalRunner(defaultOptions)
+          .runPipeline(pipeline, options);
+      }
+    }
+  })();
 }
diff --git a/sdks/typescript/src/apache_beam/runners/universal.ts b/sdks/typescript/src/apache_beam/runners/universal.ts
new file mode 100644
index 00000000000..c2c4db14c69
--- /dev/null
+++ b/sdks/typescript/src/apache_beam/runners/universal.ts
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Pipeline } from "../internal/pipeline";
+import { PipelineResult, Runner } from "./runner";
+import { PortableRunner } from "./portable_runner/runner";
+import { PythonService } from "../utils/service";
+
+export function universalRunner(runnerOptions: {
+  [others: string]: any;
+}): Runner {
+  return new (class extends Runner {
+    async runPipeline(
+      pipeline: Pipeline,
+      options: Object = {}
+    ): Promise<PipelineResult> {
+      return new PortableRunner(
+        runnerOptions as any,
+        new PythonService(
+          "apache_beam.runners.portability.local_job_service_main",
+          ["--port", "{{PORT}}"]
+        )
+      ).runPipeline(pipeline, options);
+    }
+  })();
+}
diff --git a/sdks/typescript/src/apache_beam/transforms/internal.ts b/sdks/typescript/src/apache_beam/transforms/internal.ts
index 232cfd2832f..27f03c836ab 100644
--- a/sdks/typescript/src/apache_beam/transforms/internal.ts
+++ b/sdks/typescript/src/apache_beam/transforms/internal.ts
@@ -55,6 +55,7 @@ export class Impulse extends PTransform<Root, PCollection<Uint8Array>> {
       urn: Impulse.urn,
       payload: urns.IMPULSE_BUFFER,
     });
+    transformProto.environmentId = "";
     return pipeline.createPCollectionInternal(new BytesCoder());
   }
 }
@@ -134,6 +135,7 @@ export class GroupByKey<K, V> extends PTransform<
       urn: GroupByKey.urn,
       payload: undefined!,
     });
+    transformProto.environmentId = "";
 
     // TODO: (Cleanup) warn about BsonObjectCoder and (non)deterministic key ordering?
     const keyCoder = pipeline.getCoder(inputCoderProto.componentCoderIds[0]);
diff --git a/sdks/typescript/src/apache_beam/utils/service.ts b/sdks/typescript/src/apache_beam/utils/service.ts
index 693731dc865..bb5554e3123 100644
--- a/sdks/typescript/src/apache_beam/utils/service.ts
+++ b/sdks/typescript/src/apache_beam/utils/service.ts
@@ -22,6 +22,7 @@ const os = require("os");
 const net = require("net");
 const path = require("path");
 const childProcess = require("child_process");
+const findGitRoot = require("find-git-root");
 
 // TODO: (Typescript) Why can't the var above be used as a namespace?
 import { ChildProcess } from "child_process";
@@ -47,17 +48,35 @@ export class SubprocessService {
   process: ChildProcess;
   cmd: string;
   args: string[];
+  name: string;
 
-  constructor(cmd: string, args: string[]) {
+  constructor(
+    cmd: string,
+    args: string[],
+    name: string | undefined = undefined
+  ) {
     this.cmd = cmd;
     this.args = args;
+    this.name = name || cmd;
+  }
+
+  static async freePort(): Promise<number> {
+    return new Promise((resolve) => {
+      const srv = net.createServer();
+      srv.listen(0, () => {
+        const port = srv.address().port;
+        srv.close((_) => resolve(port));
+      });
+    });
   }
 
   async start() {
-    // TODO: (Cleanup) Choose a free port.
     const host = "localhost";
-    const port = "7778";
-    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    const port = (await SubprocessService.freePort()).toString();
+    console.debug(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port))
+    );
     this.process = childProcess.spawn(
       this.cmd,
       this.args.map((arg) => arg.replace("{{PORT}}", port)),
@@ -67,7 +86,11 @@ export class SubprocessService {
     );
 
     try {
+      console.debug(
+        `Waiting for ${this.name} to be available on port ${port}.`
+      );
       await this.portReady(port, host, 10000);
+      console.debug(`Service ${this.name} available.`);
     } catch (error) {
       this.process.kill();
       throw error;
@@ -77,6 +100,7 @@ export class SubprocessService {
   }
 
   async stop() {
+    console.log(`Tearing down ${this.name}.`);
     this.process.kill();
   }
 
@@ -91,9 +115,9 @@ export class SubprocessService {
       try {
         await new Promise<void>((resolve, reject) => {
           const socket = net.createConnection(port, host, () => {
-            resolve();
-            socket.end();
             connected = true;
+            socket.end();
+            resolve();
           });
           socket.on("error", (err) => {
             reject(err);
@@ -123,10 +147,12 @@ export function serviceProviderFromJavaGradleTarget(
   };
 }
 
+const BEAM_CACHE = path.join(os.homedir(), ".apache_beam", "cache");
+
 export class JavaJarService extends SubprocessService {
   static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2";
   static BEAM_GROUP_ID = "org.apache.beam";
-  static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars");
+  static JAR_CACHE = path.join(BEAM_CACHE, "jars");
 
   constructor(jar: string, args: string[] | undefined = undefined) {
     if (args == undefined) {
@@ -185,16 +211,7 @@ export class JavaJarService extends SubprocessService {
     }
     const gradlePackage = gradleTarget.match(/^:?(.*):[^:]+:?$/)![1];
     const artifactId = "beam-" + gradlePackage.replaceAll(":", "-");
-    // TODO: Do this more robustly, e.g. use the git root.
-    const projectRoot = path.resolve(
-      __dirname,
-      "..",
-      "..",
-      "..",
-      "..",
-      "..",
-      ".."
-    );
+    const projectRoot = path.dirname(findGitRoot(__dirname));
     const localPath = path.join(
       projectRoot,
       gradlePackage.replaceAll(":", path.sep),
@@ -256,3 +273,60 @@ export class JavaJarService extends SubprocessService {
     );
   }
 }
+
+export class PythonService extends SubprocessService {
+  static VENV_CACHE = path.join(BEAM_CACHE, "venvs");
+
+  static whichPython(): string {
+    for (const bin of ["python3", "python"]) {
+      try {
+        const result = childProcess.spawnSync(bin, ["--version"]);
+        if (result.status == 0) {
+          return bin;
+        }
+      } catch (err) {
+        // Try the next one.
+      }
+    }
+    throw new Error("Can't find a Python executable.");
+  }
+
+  static beamPython(): string {
+    const projectRoot = path.dirname(findGitRoot(__dirname));
+    // TODO: Package this up with the npm.
+    const bootstrapScript = path.join(
+      projectRoot,
+      "sdks",
+      "java",
+      "extensions",
+      "python",
+      "src",
+      "main",
+      "resources",
+      "org",
+      "apache",
+      "beam",
+      "sdk",
+      "extensions",
+      "python",
+      "bootstrap_beam_venv.py"
+    );
+    console.debug("Invoking Python bootstrap script.");
+    const result = childProcess.spawnSync(
+      PythonService.whichPython(),
+      [bootstrapScript],
+      { encoding: "latin1" }
+    );
+    if (result.status == 0) {
+      console.debug(result.stdout);
+      const lines = result.stdout.trim().split("\n");
+      return lines[lines.length - 1];
+    } else {
+      throw new Error(result.output);
+    }
+  }
+
+  constructor(module: string, args: string[] = []) {
+    super(PythonService.beamPython(), ["-u", "-m", module].concat(args));
+  }
+}
diff --git a/sdks/typescript/src/apache_beam/worker/data.ts b/sdks/typescript/src/apache_beam/worker/data.ts
index b68ba41ff46..436010c109a 100644
--- a/sdks/typescript/src/apache_beam/worker/data.ts
+++ b/sdks/typescript/src/apache_beam/worker/data.ts
@@ -72,6 +72,9 @@ export class MultiplexingDataChannel {
         }
       }
     });
+    this.dataChannel.on("error", (err) => {
+      console.log("Data channel error", err);
+    });
   }
 
   close() {
diff --git a/sdks/typescript/src/apache_beam/worker/external_worker_service.ts b/sdks/typescript/src/apache_beam/worker/external_worker_service.ts
index 02f77e0d877..f20b8fb904a 100644
--- a/sdks/typescript/src/apache_beam/worker/external_worker_service.ts
+++ b/sdks/typescript/src/apache_beam/worker/external_worker_service.ts
@@ -36,13 +36,12 @@ export class ExternalWorkerPool {
   server: grpc.Server;
   workers: Map<string, Worker> = new Map();
 
-  // TODO: (Cleanup) Choose a free port.
-  constructor(address: string = "localhost:5555") {
+  constructor(address: string = "localhost:0") {
     this.address = address;
   }
 
-  start() {
-    console.log("Starting the workers at ", this.address);
+  async start(): Promise<string> {
+    console.log("Starting loopback workers at ", this.address);
     const this_ = this;
 
     this.server = new grpc.Server();
@@ -87,23 +86,35 @@ export class ExternalWorkerPool {
       },
     };
 
-    this.server.bindAsync(
-      this.address,
-      grpc.ServerCredentials.createInsecure(),
-      (err: Error | null, port: number) => {
-        if (err) {
-          console.error(`Server error: ${err.message}`);
-        } else {
-          console.log(`Server bound on port: ${port}`);
-          this_.server.start();
-        }
-      }
-    );
-
     this.server.addService(beamFnExternalWorkerPoolDefinition, workerService);
+
+    return new Promise((resolve, reject) => {
+      this.server.bindAsync(
+        this.address,
+        grpc.ServerCredentials.createInsecure(),
+        (err: Error | null, port: number) => {
+          if (err) {
+            reject(`Error starting loopback service: ${err.message}`);
+          } else {
+            console.log(`Server bound on port: ${port}`);
+            this_.address = `localhost:${port}`;
+            this_.server.start();
+            resolve(this_.address);
+          }
+        }
+      );
+    });
   }
 
-  stop() {
+  async stop(timeoutMs = 100) {
+    console.debug("Shutting down external workers.");
+    // Let the runner attempt to gracefully shut these down.
+    const start = Date.now();
+    while (Date.now() - start < timeoutMs) {
+      if (this.workers.size) {
+        await new Promise((r) => setTimeout(r, timeoutMs / 10));
+      }
+    }
     this.server.forceShutdown();
   }
 }
diff --git a/sdks/typescript/src/apache_beam/worker/worker.ts b/sdks/typescript/src/apache_beam/worker/worker.ts
index 722ff635934..8f30f26126f 100644
--- a/sdks/typescript/src/apache_beam/worker/worker.ts
+++ b/sdks/typescript/src/apache_beam/worker/worker.ts
@@ -88,10 +88,18 @@ export class Worker {
     this.controlChannel.on("end", () => {
       console.log("Control channel closed.");
       for (const dataChannel of this.dataChannels.values()) {
-        dataChannel.close();
+        try {
+          // Best effort.
+          dataChannel.close();
+        } finally {
+        }
       }
       for (const stateChannel of this.stateChannels.values()) {
-        stateChannel.close();
+        try {
+          // Best effort.
+          stateChannel.close();
+        } finally {
+        }
       }
     });
   }