You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/05/13 22:00:52 UTC
[beam] branch master updated: Add some auto-starting runners to the typescript SDK. (#17580)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 66e85dab799 Add some auto-starting runners to the typescript SDK. (#17580)
66e85dab799 is described below
commit 66e85dab799963368b9d210ac8b1d1d5635efcda
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Fri May 13 15:00:42 2022 -0700
Add some auto-starting runners to the typescript SDK. (#17580)
Adds out-of-the-box support for FlinkRunner, DataflowRunner, and the Python Universal Local runner. Also adds a DefaultRunner which chooses between the DirectRunner and the ULR depending on the properties of the pipeline.
---
.../sdk/extensions/python/bootstrap_beam_venv.py | 2 +-
sdks/typescript/package-lock.json | 15 ++-
sdks/typescript/package.json | 1 +
.../src/apache_beam/examples/wordcount.ts | 20 +++-
.../typescript/src/apache_beam/runners/dataflow.ts | 44 +++++++++
.../src/apache_beam/runners/direct_runner.ts | 47 +++++++++
sdks/typescript/src/apache_beam/runners/flink.ts | 84 ++++++++++++++++
.../apache_beam/runners/portable_runner/runner.ts | 85 ++++++++++------
sdks/typescript/src/apache_beam/runners/runner.ts | 45 ++++++++-
.../src/apache_beam/runners/universal.ts | 41 ++++++++
.../src/apache_beam/transforms/internal.ts | 2 +
sdks/typescript/src/apache_beam/utils/service.ts | 108 +++++++++++++++++----
sdks/typescript/src/apache_beam/worker/data.ts | 3 +
.../apache_beam/worker/external_worker_service.ts | 47 +++++----
sdks/typescript/src/apache_beam/worker/worker.ts | 12 ++-
15 files changed, 479 insertions(+), 77 deletions(-)
diff --git a/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py b/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py
index 08120b84adb..c113676a8f7 100644
--- a/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py
+++ b/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py
@@ -77,7 +77,7 @@ def main():
or options.beam_version.startswith('https://')):
# It's a path to a tarball.
beam_version = os.path.basename(options.beam_version)
- beam_package = options.beam_version
+ beam_package = options.beam_version + '[gcp,aws,asure,dataframe]'
else:
beam_version = options.beam_version
beam_package = 'apache_beam[gcp,aws,asure,dataframe]==' + beam_version
diff --git a/sdks/typescript/package-lock.json b/sdks/typescript/package-lock.json
index 49e62618f88..c39002e6001 100644
--- a/sdks/typescript/package-lock.json
+++ b/sdks/typescript/package-lock.json
@@ -1,12 +1,12 @@
{
"name": "apache_beam",
- "version": "0.37.0.dev",
+ "version": "0.38.0",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "apache_beam",
- "version": "0.37.0.dev",
+ "version": "0.38.0",
"dependencies": {
"@grpc/grpc-js": "^1.4.6",
"@protobuf-ts/grpc-transport": "^2.1.0",
@@ -16,6 +16,7 @@
"chai": "^4.3.4",
"date-fns": "^2.28.0",
"fast-deep-equal": "^3.1.3",
+ "find-git-root": "^1.0.4",
"long": "^4.0.0",
"protobufjs": "^6.10.2",
"queue-typescript": "^1.0.1",
@@ -919,6 +920,11 @@
"node": ">=8"
}
},
+ "node_modules/find-git-root": {
+ "version": "1.0.4",
+ "resolved": "https://registry.npmjs.org/find-git-root/-/find-git-root-1.0.4.tgz",
+ "integrity": "sha512-468fmirKKgcrqfZfPn0xIpwZUUsZQcYXfx0RC2/jX39GPz83TwutQNZZhDrI6HqjO8cRejxQVaUY8GQdXopFfA=="
+ },
"node_modules/find-up": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/find-up/-/find-up-5.0.0.tgz",
@@ -2942,6 +2948,11 @@
"to-regex-range": "^5.0.1"
}
},
+ "find-git-root": {
+ "version": "1.0.4",
+ "resolved": "https://registry.npmjs.org/find-git-root/-/find-git-root-1.0.4.tgz",
+ "integrity": "sha512-468fmirKKgcrqfZfPn0xIpwZUUsZQcYXfx0RC2/jX39GPz83TwutQNZZhDrI6HqjO8cRejxQVaUY8GQdXopFfA=="
+ },
"find-up": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/find-up/-/find-up-5.0.0.tgz",
diff --git a/sdks/typescript/package.json b/sdks/typescript/package.json
index 1e13de36bfc..6ab659ccf2e 100644
--- a/sdks/typescript/package.json
+++ b/sdks/typescript/package.json
@@ -32,6 +32,7 @@
"chai": "^4.3.4",
"date-fns": "^2.28.0",
"fast-deep-equal": "^3.1.3",
+ "find-git-root": "^1.0.4",
"long": "^4.0.0",
"protobufjs": "^6.10.2",
"queue-typescript": "^1.0.1",
diff --git a/sdks/typescript/src/apache_beam/examples/wordcount.ts b/sdks/typescript/src/apache_beam/examples/wordcount.ts
index d68d0f25600..961afb43e9b 100644
--- a/sdks/typescript/src/apache_beam/examples/wordcount.ts
+++ b/sdks/typescript/src/apache_beam/examples/wordcount.ts
@@ -16,10 +16,24 @@
* limitations under the License.
*/
-// TODO: Should this be in a top-level examples dir, rather than under apache_beam.
+// Run directly with
+//
+// node dist/src/apache_beam/examples/wordcount.js
+//
+// A different runner can be chosen via a --runner argument, e.g.
+//
+// node dist/src/apache_beam/examples/wordcount.js --runner=flink
+//
+// To run on Dataflow, pass the required arguments:
+//
+// node dist/src/apache_beam/examples/wordcount.js --runner=dataflow --project=PROJECT_ID --tempLocation=gs://BUCKET/DIR' --region=us-central1
+
+// TODO: Should this be in a top-level examples dir, rather than under apache_beam?
+
+import * as yargs from "yargs";
import * as beam from "../../apache_beam";
-import { DirectRunner } from "../runners/direct_runner";
+import { createRunner } from "../runners/runner";
import { count } from "../transforms/combiners";
import { GroupBy } from "../transforms/group_and_combine";
@@ -45,7 +59,7 @@ function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
}
async function main() {
- await new DirectRunner().run((root) => {
+ await createRunner(yargs.argv).run((root) => {
const lines = root.apply(
new beam.Create([
"In the beginning God created the heaven and the earth.",
diff --git a/sdks/typescript/src/apache_beam/runners/dataflow.ts b/sdks/typescript/src/apache_beam/runners/dataflow.ts
new file mode 100644
index 00000000000..958eb99c956
--- /dev/null
+++ b/sdks/typescript/src/apache_beam/runners/dataflow.ts
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Pipeline } from "../internal/pipeline";
+import { PipelineResult, Runner } from "./runner";
+import { PortableRunner } from "./portable_runner/runner";
+import { PythonService } from "../utils/service";
+
+export function dataflowRunner(runnerOptions: {
+ project: string;
+ tempLocation: string;
+ region: string;
+ [others: string]: any;
+}): Runner {
+ return new (class extends Runner {
+ async runPipeline(
+ pipeline: Pipeline,
+ options: Object = {}
+ ): Promise<PipelineResult> {
+ return new PortableRunner(
+ runnerOptions as any,
+ new PythonService("apache_beam.runners.dataflow.dataflow_job_service", [
+ "--port",
+ "{{PORT}}",
+ ])
+ ).runPipeline(pipeline, options);
+ }
+ })();
+}
diff --git a/sdks/typescript/src/apache_beam/runners/direct_runner.ts b/sdks/typescript/src/apache_beam/runners/direct_runner.ts
index a1a10621af9..ff203d06c41 100644
--- a/sdks/typescript/src/apache_beam/runners/direct_runner.ts
+++ b/sdks/typescript/src/apache_beam/runners/direct_runner.ts
@@ -44,14 +44,61 @@ import {
} from "../values";
import { PaneInfoCoder } from "../coders/standard_coders";
import { Coder, Context as CoderContext } from "../coders/coders";
+import * as environments from "../internal/environments";
import { serializeFn, deserializeFn } from "../internal/serialize";
+const SUPPORTED_REQUIREMENTS: string[] = [];
+
+export function directRunner(options: Object = {}): Runner {
+ return new DirectRunner(options);
+}
+
export class DirectRunner extends Runner {
// All the operators for a given pipeline should share the same state.
// This global mapping allows operators to look up a shared state object for
// a given pipeline on deserialization.
static inMemoryStatesRefs: Map<string, InMemoryStateProvider> = new Map();
+ constructor(private options: Object = {}) {
+ super();
+ }
+
+ unsupportedFeatures(pipeline, options: Object = {}): string[] {
+ return [...this.unsupportedFeaturesIter(pipeline, options)];
+ }
+
+ *unsupportedFeaturesIter(pipeline, options: Object = {}) {
+ const proto: runnerApi.Pipeline = pipeline.proto;
+ for (const requirement of proto.requirements) {
+ if (!SUPPORTED_REQUIREMENTS.includes(requirement)) {
+ yield requirement;
+ }
+ }
+
+ for (const env of Object.values(proto.components!.environments)) {
+ if (
+ env.urn &&
+ env.urn != environments.TYPESCRIPT_DEFAULT_ENVIRONMENT_URN
+ ) {
+ yield env.urn;
+ }
+ }
+
+ for (const windowing of Object.values(
+ proto.components!.windowingStrategies
+ )) {
+ if (
+ ![
+ runnerApi.MergeStatus_Enum.UNSPECIFIED,
+ runnerApi.MergeStatus_Enum.NON_MERGING,
+ runnerApi.MergeStatus_Enum.ALREADY_MERGED,
+ ].includes(windowing.mergeStatus)
+ ) {
+ yield "MergeStatus=" + windowing.mergeStatus;
+ }
+ }
+ }
+
async runPipeline(p): Promise<PipelineResult> {
// console.dir(p.proto, { depth: null });
diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts
new file mode 100644
index 00000000000..4acb68e642f
--- /dev/null
+++ b/sdks/typescript/src/apache_beam/runners/flink.ts
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const os = require("os");
+const path = require("path");
+
+import { Pipeline } from "../internal/pipeline";
+import { PipelineResult, Runner } from "./runner";
+import { PortableRunner } from "./portable_runner/runner";
+import { JavaJarService } from "../utils/service";
+
+const MAGIC_HOST_NAMES = ["[local]", "[auto]"];
+
+// These should stay in sync with gradle.properties.
+const PUBLISHED_FLINK_VERSIONS = ["1.12", "1.13", "1.14"];
+
+const defaultOptions = {
+ flinkMaster: "[local]",
+ flinkVersion: PUBLISHED_FLINK_VERSIONS[PUBLISHED_FLINK_VERSIONS.length - 1],
+};
+
+export function flinkRunner(runnerOptions: Object = {}): Runner {
+ return new (class extends Runner {
+ async runPipeline(
+ pipeline: Pipeline,
+ options: Object = {}
+ ): Promise<PipelineResult> {
+ const allOptions = {
+ ...defaultOptions,
+ ...runnerOptions,
+ ...options,
+ } as any;
+ if (
+ !allOptions.environmentType &&
+ MAGIC_HOST_NAMES.includes(allOptions.flinkMaster)
+ ) {
+ allOptions.environmentType = "LOOPBACK";
+ }
+ if (!allOptions.artifactsDir) {
+ allOptions.artifactsDir = fs.mkdtempSync(
+ path.join(os.tmpdir(), "flinkArtifactsDir")
+ );
+ }
+
+ const jobServerJar =
+ allOptions.flinkJobServerJar ||
+ (await JavaJarService.cachedJar(
+ JavaJarService.gradleToJar(
+ `runners:flink:${allOptions.flinkVersion}:job-server:shadowJar`
+ )
+ ));
+ const jobServer = new JavaJarService(jobServerJar, [
+ "--flink-master",
+ allOptions.flinkMaster,
+ "--artifacts-dir",
+ allOptions.artifactsDir,
+ "--job-port",
+ "{{PORT}}",
+ "--artifact-port",
+ "0",
+ "--expansion-port",
+ "0",
+ ]);
+
+ return new PortableRunner(allOptions, jobServer).runPipeline(pipeline);
+ }
+ })();
+}
diff --git a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts
index 3081249539f..f5281aa969d 100644
--- a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts
+++ b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts
@@ -28,11 +28,12 @@ import { ArtifactStagingServiceClient } from "../../proto/beam_artifact_api.clie
import { Pipeline } from "../../internal/pipeline";
import { PipelineResult, Runner } from "../runner";
import { PipelineOptions } from "../../options/pipeline_options";
-import { JobState_Enum } from "../../proto/beam_job_api";
+import { JobState_Enum, JobStateEvent } from "../../proto/beam_job_api";
import { ExternalWorkerPool } from "../../worker/external_worker_service";
import * as environments from "../../internal/environments";
import * as artifacts from "../artifacts";
+import { Service as JobService } from "../../utils/service";
const TERMINAL_STATES = [
JobState_Enum.DONE,
@@ -42,19 +43,22 @@ const TERMINAL_STATES = [
JobState_Enum.DRAINED,
];
+type completionCallback = (terminalState: JobStateEvent) => Promise<unknown>;
+
class PortableRunnerPipelineResult implements PipelineResult {
jobId: string;
runner: PortableRunner;
- workers?: ExternalWorkerPool;
+ completionCallbacks: completionCallback[];
+ terminalState?: JobStateEvent;
constructor(
runner: PortableRunner,
jobId: string,
- workers: ExternalWorkerPool | undefined = undefined
+ completionCallbacks: completionCallback[]
) {
this.runner = runner;
this.jobId = jobId;
- this.workers = workers;
+ this.completionCallbacks = completionCallbacks;
}
static isTerminal(state: JobState_Enum) {
@@ -62,13 +66,15 @@ class PortableRunnerPipelineResult implements PipelineResult {
}
async getState() {
+ if (this.terminalState) {
+ return this.terminalState;
+ }
const state = await this.runner.getJobState(this.jobId);
- if (
- this.workers != undefined &&
- PortableRunnerPipelineResult.isTerminal(state.state)
- ) {
- this.workers.stop();
- this.workers = undefined;
+ if (PortableRunnerPipelineResult.isTerminal(state.state)) {
+ this.terminalState = state;
+ for (const callback of this.completionCallbacks) {
+ await callback(state);
+ }
}
return state;
}
@@ -96,11 +102,12 @@ class PortableRunnerPipelineResult implements PipelineResult {
}
export class PortableRunner extends Runner {
- client: JobServiceClient;
+ client?: JobServiceClient;
defaultOptions: any;
constructor(
- options: string | { jobEndpoint: string; [others: string]: any }
+ options: string | { jobEndpoint: string; [others: string]: any },
+ private jobService: JobService | undefined = undefined
) {
super();
if (typeof options == "string") {
@@ -108,16 +115,25 @@ export class PortableRunner extends Runner {
} else if (options) {
this.defaultOptions = options;
}
- this.client = new JobServiceClient(
- new GrpcTransport({
- host: this.defaultOptions?.jobEndpoint,
- channelCredentials: ChannelCredentials.createInsecure(),
- })
- );
+ }
+
+ async getClient(): Promise<JobServiceClient> {
+ if (!this.client) {
+ if (this.jobService) {
+ this.defaultOptions.jobEndpoint = await this.jobService.start();
+ }
+ this.client = new JobServiceClient(
+ new GrpcTransport({
+ host: this.defaultOptions?.jobEndpoint,
+ channelCredentials: ChannelCredentials.createInsecure(),
+ })
+ );
+ }
+ return this.client;
}
async getJobState(jobId: string) {
- const call = this.client.getState({ jobId });
+ const call = (await this.getClient()).getState({ jobId });
return await call.response;
}
@@ -138,11 +154,18 @@ export class PortableRunner extends Runner {
options = { ...this.defaultOptions, ...options };
}
- const use_loopback_service =
- (options as any)?.environmentType == "LOOPBACK";
- const workers = use_loopback_service ? new ExternalWorkerPool() : undefined;
- if (use_loopback_service) {
- workers!.start();
+ const completionCallbacks: completionCallback[] = [];
+
+ if (this.jobService) {
+ const jobService = this.jobService;
+ completionCallbacks.push(() => jobService.stop());
+ }
+
+ let loopbackAddress: string | undefined = undefined;
+ if ((options as any)?.environmentType == "LOOPBACK") {
+ const workers = new ExternalWorkerPool();
+ loopbackAddress = await workers.start();
+ completionCallbacks.push(() => workers.stop());
}
// Replace the default environment according to the pipeline options.
@@ -151,9 +174,9 @@ export class PortableRunner extends Runner {
pipeline.components!.environments
)) {
if (env.urn == environments.TYPESCRIPT_DEFAULT_ENVIRONMENT_URN) {
- if (use_loopback_service) {
+ if (loopbackAddress) {
pipeline.components!.environments[envId] =
- environments.asExternalEnvironment(env, workers!.address);
+ environments.asExternalEnvironment(env, loopbackAddress);
} else {
pipeline.components!.environments[envId] =
environments.asDockerEnvironment(
@@ -166,6 +189,7 @@ export class PortableRunner extends Runner {
}
// Inform the runner that we'd like to execute this pipeline.
+ console.debug("Preparing job.");
let message: PrepareJobRequest = {
pipeline,
jobName: (options as any)?.jobName || "",
@@ -182,10 +206,12 @@ export class PortableRunner extends Runner {
)
);
}
- const prepareResponse = await this.client.prepare(message).response;
+ const client = await this.getClient();
+ const prepareResponse = await client.prepare(message).response;
// Allow the runner to fetch any artifacts it can't interpret.
if (prepareResponse.artifactStagingEndpoint) {
+ console.debug("Staging artifacts");
await artifacts.offerArtifacts(
new ArtifactStagingServiceClient(
new GrpcTransport({
@@ -198,7 +224,8 @@ export class PortableRunner extends Runner {
}
// Actually kick off the job.
- const runCall = this.client.run({
+ console.debug("Running job.");
+ const runCall = client.run({
preparationId: prepareResponse.preparationId,
retrievalToken: "",
});
@@ -208,6 +235,6 @@ export class PortableRunner extends Runner {
// If desired, the user can use this handle to await job completion, but
// this function returns as soon as the job is successfully started, not
// once the job has completed.
- return new PortableRunnerPipelineResult(this, jobId, workers);
+ return new PortableRunnerPipelineResult(this, jobId, completionCallbacks);
}
}
diff --git a/sdks/typescript/src/apache_beam/runners/runner.ts b/sdks/typescript/src/apache_beam/runners/runner.ts
index 272ef840783..7fc2ad79477 100644
--- a/sdks/typescript/src/apache_beam/runners/runner.ts
+++ b/sdks/typescript/src/apache_beam/runners/runner.ts
@@ -25,11 +25,29 @@ export interface PipelineResult {
waitUntilFinish(duration?: number): Promise<JobState_Enum>;
}
+export function createRunner(options): Runner {
+ let runnerConstructor: (any) => Runner;
+ if (options.runner == undefined || options.runner == "default") {
+ runnerConstructor = defaultRunner;
+ } else if (options.runner == "direct") {
+ runnerConstructor = require("./direct_runner").directRunner;
+ } else if (options.runner == "universal") {
+ runnerConstructor = require("./universal").universalRunner;
+ } else if (options.runner == "flink") {
+ runnerConstructor = require("./flink").flinkRunner;
+ } else if (options.runner == "dataflow") {
+ runnerConstructor = require("./dataflow").dataflowRunner;
+ } else {
+ throw new Error("Unknown runner: " + options.runner);
+ }
+ return runnerConstructor(options);
+}
+
/**
* A Runner is the object that takes a pipeline definition and actually
* executes, e.g. locally or on a distributed system.
*/
-export class Runner {
+export abstract class Runner {
/**
* Runs the transform.
*
@@ -64,10 +82,27 @@ export class Runner {
return this.runPipeline(p);
}
- protected async runPipeline(
+ abstract runPipeline(
pipeline: Pipeline,
options?: PipelineOptions
- ): Promise<PipelineResult> {
- throw new Error("Not implemented.");
- }
+ ): Promise<PipelineResult>;
+}
+
+export function defaultRunner(defaultOptions: Object): Runner {
+ return new (class extends Runner {
+ async runPipeline(
+ pipeline: Pipeline,
+ options: Object = {}
+ ): Promise<PipelineResult> {
+ const directRunner =
+ require("./direct_runner").directRunner(defaultOptions);
+ if (directRunner.unsupportedFeatures(pipeline, options).length == 0) {
+ return directRunner.runPipeline(pipeline, options);
+ } else {
+ return require("./universal")
+ .universalRunner(defaultOptions)
+ .runPipeline(pipeline, options);
+ }
+ }
+ })();
}
diff --git a/sdks/typescript/src/apache_beam/runners/universal.ts b/sdks/typescript/src/apache_beam/runners/universal.ts
new file mode 100644
index 00000000000..c2c4db14c69
--- /dev/null
+++ b/sdks/typescript/src/apache_beam/runners/universal.ts
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Pipeline } from "../internal/pipeline";
+import { PipelineResult, Runner } from "./runner";
+import { PortableRunner } from "./portable_runner/runner";
+import { PythonService } from "../utils/service";
+
+export function universalRunner(runnerOptions: {
+ [others: string]: any;
+}): Runner {
+ return new (class extends Runner {
+ async runPipeline(
+ pipeline: Pipeline,
+ options: Object = {}
+ ): Promise<PipelineResult> {
+ return new PortableRunner(
+ runnerOptions as any,
+ new PythonService(
+ "apache_beam.runners.portability.local_job_service_main",
+ ["--port", "{{PORT}}"]
+ )
+ ).runPipeline(pipeline, options);
+ }
+ })();
+}
diff --git a/sdks/typescript/src/apache_beam/transforms/internal.ts b/sdks/typescript/src/apache_beam/transforms/internal.ts
index 232cfd2832f..27f03c836ab 100644
--- a/sdks/typescript/src/apache_beam/transforms/internal.ts
+++ b/sdks/typescript/src/apache_beam/transforms/internal.ts
@@ -55,6 +55,7 @@ export class Impulse extends PTransform<Root, PCollection<Uint8Array>> {
urn: Impulse.urn,
payload: urns.IMPULSE_BUFFER,
});
+ transformProto.environmentId = "";
return pipeline.createPCollectionInternal(new BytesCoder());
}
}
@@ -134,6 +135,7 @@ export class GroupByKey<K, V> extends PTransform<
urn: GroupByKey.urn,
payload: undefined!,
});
+ transformProto.environmentId = "";
// TODO: (Cleanup) warn about BsonObjectCoder and (non)deterministic key ordering?
const keyCoder = pipeline.getCoder(inputCoderProto.componentCoderIds[0]);
diff --git a/sdks/typescript/src/apache_beam/utils/service.ts b/sdks/typescript/src/apache_beam/utils/service.ts
index 693731dc865..bb5554e3123 100644
--- a/sdks/typescript/src/apache_beam/utils/service.ts
+++ b/sdks/typescript/src/apache_beam/utils/service.ts
@@ -22,6 +22,7 @@ const os = require("os");
const net = require("net");
const path = require("path");
const childProcess = require("child_process");
+const findGitRoot = require("find-git-root");
// TODO: (Typescript) Why can't the var above be used as a namespace?
import { ChildProcess } from "child_process";
@@ -47,17 +48,35 @@ export class SubprocessService {
process: ChildProcess;
cmd: string;
args: string[];
+ name: string;
- constructor(cmd: string, args: string[]) {
+ constructor(
+ cmd: string,
+ args: string[],
+ name: string | undefined = undefined
+ ) {
this.cmd = cmd;
this.args = args;
+ this.name = name || cmd;
+ }
+
+ static async freePort(): Promise<number> {
+ return new Promise((resolve) => {
+ const srv = net.createServer();
+ srv.listen(0, () => {
+ const port = srv.address().port;
+ srv.close((_) => resolve(port));
+ });
+ });
}
async start() {
- // TODO: (Cleanup) Choose a free port.
const host = "localhost";
- const port = "7778";
- console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+ const port = (await SubprocessService.freePort()).toString();
+ console.debug(
+ this.cmd,
+ this.args.map((arg) => arg.replace("{{PORT}}", port))
+ );
this.process = childProcess.spawn(
this.cmd,
this.args.map((arg) => arg.replace("{{PORT}}", port)),
@@ -67,7 +86,11 @@ export class SubprocessService {
);
try {
+ console.debug(
+ `Waiting for ${this.name} to be available on port ${port}.`
+ );
await this.portReady(port, host, 10000);
+ console.debug(`Service ${this.name} available.`);
} catch (error) {
this.process.kill();
throw error;
@@ -77,6 +100,7 @@ export class SubprocessService {
}
async stop() {
+ console.log(`Tearing down ${this.name}.`);
this.process.kill();
}
@@ -91,9 +115,9 @@ export class SubprocessService {
try {
await new Promise<void>((resolve, reject) => {
const socket = net.createConnection(port, host, () => {
- resolve();
- socket.end();
connected = true;
+ socket.end();
+ resolve();
});
socket.on("error", (err) => {
reject(err);
@@ -123,10 +147,12 @@ export function serviceProviderFromJavaGradleTarget(
};
}
+const BEAM_CACHE = path.join(os.homedir(), ".apache_beam", "cache");
+
export class JavaJarService extends SubprocessService {
static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2";
static BEAM_GROUP_ID = "org.apache.beam";
- static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars");
+ static JAR_CACHE = path.join(BEAM_CACHE, "jars");
constructor(jar: string, args: string[] | undefined = undefined) {
if (args == undefined) {
@@ -185,16 +211,7 @@ export class JavaJarService extends SubprocessService {
}
const gradlePackage = gradleTarget.match(/^:?(.*):[^:]+:?$/)![1];
const artifactId = "beam-" + gradlePackage.replaceAll(":", "-");
- // TODO: Do this more robustly, e.g. use the git root.
- const projectRoot = path.resolve(
- __dirname,
- "..",
- "..",
- "..",
- "..",
- "..",
- ".."
- );
+ const projectRoot = path.dirname(findGitRoot(__dirname));
const localPath = path.join(
projectRoot,
gradlePackage.replaceAll(":", path.sep),
@@ -256,3 +273,60 @@ export class JavaJarService extends SubprocessService {
);
}
}
+
+export class PythonService extends SubprocessService {
+ static VENV_CACHE = path.join(BEAM_CACHE, "venvs");
+
+ static whichPython(): string {
+ for (const bin of ["python3", "python"]) {
+ try {
+ const result = childProcess.spawnSync(bin, ["--version"]);
+ if (result.status == 0) {
+ return bin;
+ }
+ } catch (err) {
+ // Try the next one.
+ }
+ }
+ throw new Error("Can't find a Python executable.");
+ }
+
+ static beamPython(): string {
+ const projectRoot = path.dirname(findGitRoot(__dirname));
+ // TODO: Package this up with the npm.
+ const bootstrapScript = path.join(
+ projectRoot,
+ "sdks",
+ "java",
+ "extensions",
+ "python",
+ "src",
+ "main",
+ "resources",
+ "org",
+ "apache",
+ "beam",
+ "sdk",
+ "extensions",
+ "python",
+ "bootstrap_beam_venv.py"
+ );
+ console.debug("Invoking Python bootstrap script.");
+ const result = childProcess.spawnSync(
+ PythonService.whichPython(),
+ [bootstrapScript],
+ { encoding: "latin1" }
+ );
+ if (result.status == 0) {
+ console.debug(result.stdout);
+ const lines = result.stdout.trim().split("\n");
+ return lines[lines.length - 1];
+ } else {
+ throw new Error(result.output);
+ }
+ }
+
+ constructor(module: string, args: string[] = []) {
+ super(PythonService.beamPython(), ["-u", "-m", module].concat(args));
+ }
+}
diff --git a/sdks/typescript/src/apache_beam/worker/data.ts b/sdks/typescript/src/apache_beam/worker/data.ts
index b68ba41ff46..436010c109a 100644
--- a/sdks/typescript/src/apache_beam/worker/data.ts
+++ b/sdks/typescript/src/apache_beam/worker/data.ts
@@ -72,6 +72,9 @@ export class MultiplexingDataChannel {
}
}
});
+ this.dataChannel.on("error", (err) => {
+ console.log("Data channel error", err);
+ });
}
close() {
diff --git a/sdks/typescript/src/apache_beam/worker/external_worker_service.ts b/sdks/typescript/src/apache_beam/worker/external_worker_service.ts
index 02f77e0d877..f20b8fb904a 100644
--- a/sdks/typescript/src/apache_beam/worker/external_worker_service.ts
+++ b/sdks/typescript/src/apache_beam/worker/external_worker_service.ts
@@ -36,13 +36,12 @@ export class ExternalWorkerPool {
server: grpc.Server;
workers: Map<string, Worker> = new Map();
- // TODO: (Cleanup) Choose a free port.
- constructor(address: string = "localhost:5555") {
+ constructor(address: string = "localhost:0") {
this.address = address;
}
- start() {
- console.log("Starting the workers at ", this.address);
+ async start(): Promise<string> {
+ console.log("Starting loopback workers at ", this.address);
const this_ = this;
this.server = new grpc.Server();
@@ -87,23 +86,35 @@ export class ExternalWorkerPool {
},
};
- this.server.bindAsync(
- this.address,
- grpc.ServerCredentials.createInsecure(),
- (err: Error | null, port: number) => {
- if (err) {
- console.error(`Server error: ${err.message}`);
- } else {
- console.log(`Server bound on port: ${port}`);
- this_.server.start();
- }
- }
- );
-
this.server.addService(beamFnExternalWorkerPoolDefinition, workerService);
+
+ return new Promise((resolve, reject) => {
+ this.server.bindAsync(
+ this.address,
+ grpc.ServerCredentials.createInsecure(),
+ (err: Error | null, port: number) => {
+ if (err) {
+ reject(`Error starting loopback service: ${err.message}`);
+ } else {
+ console.log(`Server bound on port: ${port}`);
+ this_.address = `localhost:${port}`;
+ this_.server.start();
+ resolve(this_.address);
+ }
+ }
+ );
+ });
}
- stop() {
+ async stop(timeoutMs = 100) {
+ console.debug("Shutting down external workers.");
+ // Let the runner attempt to gracefully shut these down.
+ const start = Date.now();
+ while (Date.now() - start < timeoutMs) {
+ if (this.workers.size) {
+ await new Promise((r) => setTimeout(r, timeoutMs / 10));
+ }
+ }
this.server.forceShutdown();
}
}
diff --git a/sdks/typescript/src/apache_beam/worker/worker.ts b/sdks/typescript/src/apache_beam/worker/worker.ts
index 722ff635934..8f30f26126f 100644
--- a/sdks/typescript/src/apache_beam/worker/worker.ts
+++ b/sdks/typescript/src/apache_beam/worker/worker.ts
@@ -88,10 +88,18 @@ export class Worker {
this.controlChannel.on("end", () => {
console.log("Control channel closed.");
for (const dataChannel of this.dataChannels.values()) {
- dataChannel.close();
+ try {
+ // Best effort.
+ dataChannel.close();
+ } finally {
+ }
}
for (const stateChannel of this.stateChannels.values()) {
- stateChannel.close();
+ try {
+ // Best effort.
+ stateChannel.close();
+ } finally {
+ }
}
});
}