You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/11 21:12:42 UTC

[GitHub] [beam] robertwb opened a new pull request, #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

robertwb opened a new pull request, #17341:
URL: https://github.com/apache/beam/pull/17341

   Coordination file for dividing up the review can be found at https://docs.google.com/spreadsheets/d/16nqMaBIIyM3s0wq0eS10e-eqktuS8HYpCRkRDHmAMf8/edit
   
   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r855155414


##########
sdks/typescript/test/core_test.ts:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as beam from "../src/apache_beam";
+import * as assert from "assert";
+import { BytesCoder } from "../src/apache_beam/coders/standard_coders";
+import { Pipeline } from "../src/apache_beam/internal/pipeline";
+// TODO(pabloem): Fix installation.

Review Comment:
   @pabloem what is this TODO? Can we either fix it or move it to a JIRA?



##########
sdks/typescript/test/serialize_test.ts:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { expect } from "chai";
+import {
+  deserialize,
+  serialize,
+  BuiltinList,
+  generateDefaultBuiltins,
+} from "serialize-closures";
+
+describe("serialization tests", function () {
+  function roundtrip(value, builtins?: BuiltinList) {
+    return deserialize(
+      JSON.parse(JSON.stringify(serialize(value, builtins))),
+      builtins
+    );
+  }
+
+  function expectRoundtrip(value, builtins?: BuiltinList) {
+    expect(roundtrip(value, builtins)).to.deep.equal(value);
+  }
+
+  function* myGenerator() {
+    yield 42;
+    yield 84;
+  }
+
+  function simpleGenerator() {
+    expect(myGenerator().next()).to.equal(42);
+  }
+
+  function roundtripGeneratorConstructor() {
+    expect(roundtrip(myGenerator)().next()).to.equal(42);
+  }
+
+  function roundtripGeneratorInProgress() {

Review Comment:
   @kerrydc it looks like these test aren't actually using the generator or these functions - did you mean to make this:
   
   ```
   it("serializes  and deserializes in progress generators", function() {
       const gen = myGenerator();
       expect(gen.next()).to.equal(42);
       expect(roundtrip(gen).next()).to.equal(84);
     });
   ```
   
   Same question for the other generator related functions



##########
sdks/typescript/test/combine_test.ts:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as beam from "../src/apache_beam";
+import { DirectRunner } from "../src/apache_beam/runners/direct_runner";
+import * as testing from "../src/apache_beam/testing/assert";
+import { KV } from "../src/apache_beam/values";
+
+import { PortableRunner } from "../src/apache_beam/runners/portable_runner/runner";
+import * as combiners from "../src/apache_beam/transforms/combiners";
+import {
+  CombineFn,
+  GroupBy,
+  GroupGlobally,
+  CountPerElement,
+  CountGlobally,
+} from "../src/apache_beam/transforms/group_and_combine";
+
+describe("Apache Beam combiners", function () {
+  it("runs wordcount with a countPerKey transform and asserts the result", async function () {
+    //         await new PortableRunner('localhost:3333').run(

Review Comment:
   @pabloem Do we need this?



##########
sdks/typescript/test/core_test.ts:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as beam from "../src/apache_beam";
+import * as assert from "assert";
+import { BytesCoder } from "../src/apache_beam/coders/standard_coders";
+import { Pipeline } from "../src/apache_beam/internal/pipeline";
+// TODO(pabloem): Fix installation.
+
+describe("core module", function () {
+  describe("runs a basic impulse expansion", function () {
+    it("runs a basic Impulse expansion", function () {
+      var p = new Pipeline();
+      var res = new beam.Root(p).apply(new beam.Impulse());
+
+      assert.equal(res.type, "pcollection");
+      assert.deepEqual(p.context.getPCollectionCoder(res), new BytesCoder());
+    });
+    it("runs a ParDo expansion", function () {
+      var p = new Pipeline();
+      var res = new beam.Root(p)
+        .apply(new beam.Impulse())
+        .map(function (v: any) {
+          return v * 2;
+        });
+    });
+    it("runs a GroupBy expansion", function () {});

Review Comment:
   @pabloem we should either cut this or fill it out



##########
sdks/typescript/test/core_test.ts:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as beam from "../src/apache_beam";
+import * as assert from "assert";
+import { BytesCoder } from "../src/apache_beam/coders/standard_coders";
+import { Pipeline } from "../src/apache_beam/internal/pipeline";
+// TODO(pabloem): Fix installation.
+
+describe("core module", function () {
+  describe("runs a basic impulse expansion", function () {
+    it("runs a basic Impulse expansion", function () {
+      var p = new Pipeline();
+      var res = new beam.Root(p).apply(new beam.Impulse());
+
+      assert.equal(res.type, "pcollection");
+      assert.deepEqual(p.context.getPCollectionCoder(res), new BytesCoder());
+    });
+    it("runs a ParDo expansion", function () {
+      var p = new Pipeline();
+      var res = new beam.Root(p)
+        .apply(new beam.Impulse())
+        .map(function (v: any) {
+          return v * 2;
+        });

Review Comment:
   @pabloem Should we be doing some assertions here?



##########
sdks/typescript/test/serialize_test.ts:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { expect } from "chai";
+import {
+  deserialize,
+  serialize,
+  BuiltinList,
+  generateDefaultBuiltins,
+} from "serialize-closures";
+
+describe("serialization tests", function () {
+  function roundtrip(value, builtins?: BuiltinList) {
+    return deserialize(
+      JSON.parse(JSON.stringify(serialize(value, builtins))),
+      builtins
+    );
+  }
+
+  function expectRoundtrip(value, builtins?: BuiltinList) {

Review Comment:
   @kerrydc it doesn't look like this is used



##########
sdks/typescript/test/combine_test.ts:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as beam from "../src/apache_beam";
+import { DirectRunner } from "../src/apache_beam/runners/direct_runner";
+import * as testing from "../src/apache_beam/testing/assert";
+import { KV } from "../src/apache_beam/values";
+
+import { PortableRunner } from "../src/apache_beam/runners/portable_runner/runner";
+import * as combiners from "../src/apache_beam/transforms/combiners";
+import {
+  CombineFn,
+  GroupBy,
+  GroupGlobally,
+  CountPerElement,
+  CountGlobally,
+} from "../src/apache_beam/transforms/group_and_combine";
+
+describe("Apache Beam combiners", function () {
+  it("runs wordcount with a countPerKey transform and asserts the result", async function () {
+    //         await new PortableRunner('localhost:3333').run(

Review Comment:
   Same thing applies to the duplicate line in primitives_test and wordcount.ts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb merged pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
robertwb merged PR #17341:
URL: https://github.com/apache/beam/pull/17341


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r855637864


##########
sdks/typescript/test/combine_test.ts:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as beam from "../src/apache_beam";
+import { DirectRunner } from "../src/apache_beam/runners/direct_runner";
+import * as testing from "../src/apache_beam/testing/assert";
+import { KV } from "../src/apache_beam/values";
+
+import { PortableRunner } from "../src/apache_beam/runners/portable_runner/runner";
+import * as combiners from "../src/apache_beam/transforms/combiners";
+import {
+  CombineFn,
+  GroupBy,
+  GroupGlobally,
+  CountPerElement,
+  CountGlobally,
+} from "../src/apache_beam/transforms/group_and_combine";
+
+describe("Apache Beam combiners", function () {
+  it("runs wordcount with a countPerKey transform and asserts the result", async function () {
+    //         await new PortableRunner('localhost:3333').run(

Review Comment:
   Leftover from testing. Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r859008628


##########
sdks/typescript/src/apache_beam/internal/pipeline.ts:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import equal from "fast-deep-equal";
+
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import {
+  PTransform,
+  AsyncPTransform,
+  extractName,
+} from "../transforms/transform";
+import { GlobalWindows } from "../transforms/windowings";
+import * as pvalue from "../pvalue";
+import { WindowInto } from "../transforms/window";
+import * as environments from "./environments";
+import { Coder, globalRegistry as globalCoderRegistry } from "../coders/coders";
+
+type Components = runnerApi.Components | fnApi.ProcessBundleDescriptor;
+
+// TODO: Cleanup. Where to put this.
+export class PipelineContext {
+  counter: number = 0;
+
+  private coders: { [key: string]: Coder<any> } = {};
+
+  constructor(public components: Components) {}
+
+  getCoder<T>(coderId: string): Coder<T> {
+    const this_ = this;
+    if (this.coders[coderId] == undefined) {
+      const coderProto = this.components.coders[coderId];
+      const coderConstructor = globalCoderRegistry().get(coderProto.spec!.urn);
+      const components = (coderProto.componentCoderIds || []).map(
+        this_.getCoder.bind(this_)
+      );
+      if (coderProto.spec!.payload && coderProto.spec!.payload.length) {

Review Comment:
   ```suggestion
         if (coderProto.spec!.payload?.length) {
   ```



##########
sdks/typescript/webpack.config.js:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   @kerrydc bumping Robert's question from spreadsheet here - do we need this file? It looks like the tsconfig is handling the ts-closure-transform usage, and I don't think this file is actually being used at the moment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r862071859


##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnDataClient,
+  IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+  dataClient: BeamFnDataClient;
+  dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+  consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+  constructor(endpoint: string, workerId: string) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", workerId);
+    this.dataClient = new BeamFnDataClient(
+      endpoint,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.dataChannel = this.dataClient.data(metadata);
+    this.dataChannel.on("data", async (elements) => {
+      console.log("data", elements);
+      for (const data of elements.data) {
+        const consumer = this.getConsumer(data.instructionId, data.transformId);
+        try {
+          await consumer.sendData(data.data);
+          if (data.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+      for (const timers of elements.timers) {
+        const consumer = this.getConsumer(
+          timers.instructionId,
+          timers.transformId
+        );
+        try {
+          await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+          if (timers.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+    });
+  }
+
+  close() {
+    this.dataChannel.end();
+  }
+
+  async registerConsumer(
+    bundleId: string,
+    transformId: string,
+    consumer: IDataChannel
+  ) {
+    consumer = truncateOnErrorDataChannel(consumer);
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (this.consumers.get(bundleId)!.has(transformId)) {

Review Comment:
   I won't push further to change this since its not logically incorrect, but if we enter the first `if` then we set the `bundleId` entry to `new Map()`. That in turn means that this second `if` will always evaluate to false when the first one evaluates to true.
   
   Basically, they are logically equivalent, but we can save the extra if check ¯\_(ツ)_/¯ 



##########
sdks/typescript/src/apache_beam/transforms/window.ts:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import * as urns from "../internal/urns";
+
+import { PTransform } from "./transform";
+import { Coder } from "../coders/coders";
+import { Window } from "../values";
+import { PCollection } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+import { ParDo } from "./pardo";
+import { serializeFn } from "../internal/serialize";
+
+export interface WindowFn<W extends Window> {
+  assignWindows: (Instant) => W[];
+  windowCoder: () => Coder<W>;
+  toProto: () => runnerApi.FunctionSpec;
+  isMerging: () => boolean;
+  assignsToOneWindow: () => boolean;
+}
+
+export class WindowInto<T, W extends Window> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  static createWindowingStrategy(
+    pipeline: Pipeline,
+    windowFn: WindowFn<any>,
+    windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+  ): runnerApi.WindowingStrategy {
+    let result: runnerApi.WindowingStrategy;
+    if (windowingStrategyBase == undefined) {
+      result = {
+        windowFn: undefined!,
+        windowCoderId: undefined!,
+        mergeStatus: undefined!,
+        assignsToOneWindow: undefined!,
+        trigger: { trigger: { oneofKind: "default", default: {} } },
+        accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
+        outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
+        closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
+        onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
+        allowedLateness: BigInt(0),
+        environmentId: pipeline.defaultEnvironment,
+      };
+    } else {
+      result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
+    }
+    result.windowFn = windowFn.toProto();
+    result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
+    result.mergeStatus = windowFn.isMerging()
+      ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
+      : runnerApi.MergeStatus_Enum.NON_MERGING;
+    result.assignsToOneWindow = windowFn.assignsToOneWindow();
+    return result;
+  }
+
+  constructor(
+    private windowFn: WindowFn<W>,
+    private windowingStrategyBase:
+      | runnerApi.WindowingStrategy
+      | undefined = undefined
+  ) {
+    super("WindowInto(" + windowFn + ", " + windowingStrategyBase + ")");
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_WINDOW_INTO_DOFN_URN,
+            payload: serializeFn({ windowFn: this.windowFn }),
+          }),
+        })
+      ),
+    });
+
+    const inputCoder = pipeline.context.getPCollectionCoderId(input);
+    return pipeline.createPCollectionInternal<T>(
+      inputCoder,
+      WindowInto.createWindowingStrategy(
+        pipeline,
+        this.windowFn,
+        this.windowingStrategyBase
+      )
+    );
+  }
+}
+
+// TODO: (Cleanup) Add restrictions on moving backwards?
+export class AssignTimestamps<T> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  constructor(private func: (T, Instant) => typeof Instant) {
+    super();
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
+            payload: serializeFn({ func: this.func }),
+          }),
+        })
+      ),
+    });
+

Review Comment:
   SGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858754251


##########
sdks/typescript/src/apache_beam/examples/wordcount2.ts:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// TODO: Should this be in a top-level examples dir, rather than under apache_beam.
+
+import * as beam from "../../apache_beam";
+import * as textio from "../io/textio";
+import { DirectRunner } from "../runners/direct_runner";
+
+import { count } from "../transforms/combiners";
+import { GroupBy } from "../transforms/group_and_combine";
+
+import { PortableRunner } from "../runners/portable_runner/runner";
+
+class CountElements extends beam.PTransform<
+  beam.PCollection<any>,
+  beam.PCollection<any>
+> {
+  expand(input: beam.PCollection<any>) {
+    return input
+      .map((e) => ({ element: e }))
+      .apply(new GroupBy("element").combining("element", count, "count"));
+  }
+}
+
+function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
+  return lines
+    .map((s: string) => s.toLowerCase())
+    .flatMap(function* (line: string) {
+      yield* line.split(/[^a-z]+/);
+    })
+    .apply(new CountElements("Count"));
+}
+
+async function main() {
+  // python apache_beam/runners/portability/local_job_service_main.py --port 3333
+  await new PortableRunner("localhost:3333").run(async (root) => {
+    const lines = await root.asyncApply(
+      new textio.ReadFromText("gs://dataflow-samples/shakespeare/kinglear.txt")

Review Comment:
   Can we give this file (and wordcount3.ts) more descriptive names? Maybe something like `wordcount_textio.ts` and `wordcount_xlang.ts`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r855639021


##########
sdks/typescript/test/core_test.ts:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as beam from "../src/apache_beam";
+import * as assert from "assert";
+import { BytesCoder } from "../src/apache_beam/coders/standard_coders";
+import { Pipeline } from "../src/apache_beam/internal/pipeline";
+// TODO(pabloem): Fix installation.

Review Comment:
   I went ahead and removed this file--it's entirely redundant with `primitives_test.ts` (and uses an older style from before we could actually run pipelines to verify they were constructed correctly).



##########
sdks/typescript/test/core_test.ts:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as beam from "../src/apache_beam";
+import * as assert from "assert";
+import { BytesCoder } from "../src/apache_beam/coders/standard_coders";
+import { Pipeline } from "../src/apache_beam/internal/pipeline";
+// TODO(pabloem): Fix installation.
+
+describe("core module", function () {
+  describe("runs a basic impulse expansion", function () {
+    it("runs a basic Impulse expansion", function () {
+      var p = new Pipeline();
+      var res = new beam.Root(p).apply(new beam.Impulse());
+
+      assert.equal(res.type, "pcollection");
+      assert.deepEqual(p.context.getPCollectionCoder(res), new BytesCoder());
+    });
+    it("runs a ParDo expansion", function () {
+      var p = new Pipeline();
+      var res = new beam.Root(p)
+        .apply(new beam.Impulse())
+        .map(function (v: any) {
+          return v * 2;
+        });

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r859006987


##########
sdks/typescript/src/apache_beam/internal/serialize.ts:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as serialize_closures from "serialize-closures";
+import Long from "long";
+
+import { requireForSerialization, registeredObjects } from "../serialization";
+
+const BIGINT_PREFIX = ":bigint:";
+
+const generator = function* () {};
+
+requireForSerialization("apache_beam", {
+  generator: generator,
+  generator_prototype: generator.prototype,
+  TextEncoder: TextEncoder,
+  TextDecoder: TextDecoder,
+  Long,
+});
+
+export function serializeFn(obj: unknown): Uint8Array {
+  return new TextEncoder().encode(
+    JSON.stringify(
+      serialize_closures.serialize(
+        obj,
+        serialize_closures.defaultBuiltins.concat(registeredObjects)
+      ),
+      (key, value) =>
+        typeof value === "bigint" ? `${BIGINT_PREFIX}${value}` : value
+    )
+  );
+}
+
+export function deserializeFn(s: Uint8Array): any {
+  return serialize_closures.deserialize(
+    JSON.parse(new TextDecoder().decode(s), (key, value) =>
+      typeof value === "string" && value.startsWith(BIGINT_PREFIX)
+        ? BigInt(value.substr(BIGINT_PREFIX.length))
+        : value
+    ),
+    serialize_closures.defaultBuiltins.concat(registeredObjects)
+  );
+}
+
+let fakeSerializeCounter = 0;
+const fakeSerializeMap = new Map<string, any>();
+
+function fakeSeralize(obj) {
+  fakeSerializeCounter += 1;
+  const id = "s_" + fakeSerializeCounter;
+  fakeSerializeMap.set(id, obj);
+  return new TextEncoder().encode(id);
+}
+
+function fakeDeserialize(s) {
+  return fakeSerializeMap.get(new TextDecoder().decode(s));
+}

Review Comment:
   ```suggestion
   ```
   
   None of this is used AFAIK and even if it is it probably should be in a separate file for mocks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r861994013


##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);
+  }
+  return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+  transformId: string,
+  transformProto: PTransform,
+  context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+  new (
+    transformId: string,
+    transformProto: PTransform,
+    context: OperatorContext
+  ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {

Review Comment:
   This doesn't return the operator.



##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc, i) => acc + 1,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc: number, i: number) => acc + i,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a > b ? a : b)),

Review Comment:
   Yes. Done.



##########
sdks/typescript/src/apache_beam/examples/wordcount2.ts:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// TODO: Should this be in a top-level examples dir, rather than under apache_beam.
+
+import * as beam from "../../apache_beam";
+import * as textio from "../io/textio";
+import { DirectRunner } from "../runners/direct_runner";
+
+import { count } from "../transforms/combiners";
+import { GroupBy } from "../transforms/group_and_combine";
+
+import { PortableRunner } from "../runners/portable_runner/runner";
+
+class CountElements extends beam.PTransform<
+  beam.PCollection<any>,
+  beam.PCollection<any>
+> {
+  expand(input: beam.PCollection<any>) {
+    return input
+      .map((e) => ({ element: e }))
+      .apply(new GroupBy("element").combining("element", count, "count"));
+  }
+}
+
+function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
+  return lines
+    .map((s: string) => s.toLowerCase())
+    .flatMap(function* (line: string) {
+      yield* line.split(/[^a-z]+/);
+    })
+    .apply(new CountElements("Count"));
+}
+
+async function main() {
+  // python apache_beam/runners/portability/local_job_service_main.py --port 3333
+  await new PortableRunner("localhost:3333").run(async (root) => {
+    const lines = await root.asyncApply(
+      new textio.ReadFromText("gs://dataflow-samples/shakespeare/kinglear.txt")

Review Comment:
   Done.



##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnDataClient,
+  IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+  dataClient: BeamFnDataClient;
+  dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+  consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+  constructor(endpoint: string, workerId: string) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", workerId);
+    this.dataClient = new BeamFnDataClient(
+      endpoint,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.dataChannel = this.dataClient.data(metadata);
+    this.dataChannel.on("data", async (elements) => {
+      console.log("data", elements);
+      for (const data of elements.data) {
+        const consumer = this.getConsumer(data.instructionId, data.transformId);
+        try {
+          await consumer.sendData(data.data);
+          if (data.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+      for (const timers of elements.timers) {
+        const consumer = this.getConsumer(
+          timers.instructionId,
+          timers.transformId
+        );
+        try {
+          await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+          if (timers.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+    });
+  }
+
+  close() {
+    this.dataChannel.end();
+  }
+
+  async registerConsumer(
+    bundleId: string,
+    transformId: string,
+    consumer: IDataChannel
+  ) {
+    consumer = truncateOnErrorDataChannel(consumer);
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (this.consumers.get(bundleId)!.has(transformId)) {

Review Comment:
   This is a two-level mapping; the first `if` makes sure the first level exists and the second checks the second level. 



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(

Review Comment:
   As I read the documentation, spanSync waits until the process has completed, which is not what we want here. 



##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnDataClient,
+  IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+  dataClient: BeamFnDataClient;
+  dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+  consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+  constructor(endpoint: string, workerId: string) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", workerId);
+    this.dataClient = new BeamFnDataClient(
+      endpoint,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.dataChannel = this.dataClient.data(metadata);
+    this.dataChannel.on("data", async (elements) => {
+      console.log("data", elements);
+      for (const data of elements.data) {
+        const consumer = this.getConsumer(data.instructionId, data.transformId);
+        try {
+          await consumer.sendData(data.data);
+          if (data.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+      for (const timers of elements.timers) {
+        const consumer = this.getConsumer(
+          timers.instructionId,
+          timers.transformId
+        );
+        try {
+          await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+          if (timers.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+    });
+  }
+
+  close() {
+    this.dataChannel.end();
+  }
+
+  async registerConsumer(
+    bundleId: string,
+    transformId: string,
+    consumer: IDataChannel
+  ) {
+    consumer = truncateOnErrorDataChannel(consumer);
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (this.consumers.get(bundleId)!.has(transformId)) {
+      await (
+        this.consumers.get(bundleId)!.get(transformId) as BufferingDataChannel
+      ).flush(consumer);
+    }
+    this.consumers.get(bundleId)!.set(transformId, consumer);
+  }
+
+  unregisterConsumer(bundleId: string, transformId: string) {
+    this.consumers.get(bundleId)!.delete(transformId);
+  }
+
+  getConsumer(bundleId: string, transformId: string): IDataChannel {
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (!this.consumers.get(bundleId)!.has(transformId)) {

Review Comment:
   Same.



##########
sdks/typescript/src/apache_beam/transforms/window.ts:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import * as urns from "../internal/urns";
+
+import { PTransform } from "./transform";
+import { Coder } from "../coders/coders";
+import { Window } from "../values";
+import { PCollection } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+import { ParDo } from "./pardo";
+import { serializeFn } from "../internal/serialize";
+
+export interface WindowFn<W extends Window> {
+  assignWindows: (Instant) => W[];
+  windowCoder: () => Coder<W>;
+  toProto: () => runnerApi.FunctionSpec;
+  isMerging: () => boolean;
+  assignsToOneWindow: () => boolean;
+}
+
+export class WindowInto<T, W extends Window> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  static createWindowingStrategy(
+    pipeline: Pipeline,
+    windowFn: WindowFn<any>,
+    windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+  ): runnerApi.WindowingStrategy {
+    let result: runnerApi.WindowingStrategy;
+    if (windowingStrategyBase == undefined) {
+      result = {
+        windowFn: undefined!,
+        windowCoderId: undefined!,
+        mergeStatus: undefined!,
+        assignsToOneWindow: undefined!,
+        trigger: { trigger: { oneofKind: "default", default: {} } },
+        accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
+        outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
+        closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
+        onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
+        allowedLateness: BigInt(0),
+        environmentId: pipeline.defaultEnvironment,
+      };
+    } else {
+      result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
+    }
+    result.windowFn = windowFn.toProto();
+    result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
+    result.mergeStatus = windowFn.isMerging()
+      ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
+      : runnerApi.MergeStatus_Enum.NON_MERGING;
+    result.assignsToOneWindow = windowFn.assignsToOneWindow();
+    return result;
+  }
+
+  constructor(
+    private windowFn: WindowFn<W>,
+    private windowingStrategyBase:
+      | runnerApi.WindowingStrategy
+      | undefined = undefined
+  ) {
+    super("WindowInto(" + windowFn + ", " + windowingStrategyBase + ")");
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_WINDOW_INTO_DOFN_URN,
+            payload: serializeFn({ windowFn: this.windowFn }),
+          }),
+        })
+      ),
+    });
+
+    const inputCoder = pipeline.context.getPCollectionCoderId(input);
+    return pipeline.createPCollectionInternal<T>(
+      inputCoder,
+      WindowInto.createWindowingStrategy(
+        pipeline,
+        this.windowFn,
+        this.windowingStrategyBase
+      )
+    );
+  }
+}
+
+// TODO: (Cleanup) Add restrictions on moving backwards?
+export class AssignTimestamps<T> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  constructor(private func: (T, Instant) => typeof Instant) {
+    super();
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
+            payload: serializeFn({ func: this.func }),
+          }),
+        })
+      ),
+    });
+

Review Comment:
   The payloads are different as well. I'm going to leave this as is, as there's also value in making this quasi-object-literal very transparent. 



##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);
+  }
+  return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+  transformId: string,
+  transformProto: PTransform,
+  context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+  new (
+    transformId: string,
+    transformProto: PTransform,
+    context: OperatorContext
+  ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {
+  registerOperatorConstructor(urn, (transformId, transformProto, context) => {
+    return new cls(transformId, transformProto, context);
+  });
+}
+
+export function registerOperatorConstructor(
+  urn: string,
+  constructor: OperatorConstructor
+) {
+  operatorsByUrn.set(urn, constructor);
+}
+
+////////// Actual operator implementation. //////////
+
+// NOTE: It may have been more idiomatic to use objects in closures satisfying
+// the IOperator interface here, but classes are used to make a clearer pattern
+// potential SDK authors that are less familiar with javascript.
+
+class DataSourceOperator implements IOperator {
+  transformId: string;
+  getBundleId: () => string;
+  multiplexingDataChannel: MultiplexingDataChannel;
+  receiver: Receiver;
+  coder: Coder<WindowedValue<unknown>>;
+  endOfData: Promise<void>;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    const readPort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+    this.multiplexingDataChannel = context.getDataChannel(
+      readPort.apiServiceDescriptor!.url
+    );
+    this.transformId = transformId;
+    this.getBundleId = context.getBundleId;
+    this.receiver = context.getReceiver(
+      onlyElement(Object.values(transform.outputs))
+    );
+    this.coder = context.pipelineContext.getCoder(readPort.coderId);
+  }
+
+  async startBundle() {
+    const this_ = this;
+    var endOfDataResolve, endOfDataReject;
+    this.endOfData = new Promise(async (resolve, reject) => {
+      endOfDataResolve = resolve;
+      endOfDataReject = reject;
+    });
+
+    await this_.multiplexingDataChannel.registerConsumer(
+      this_.getBundleId(),
+      this_.transformId,
+      {
+        sendData: async function (data: Uint8Array) {
+          console.log("Got", data);
+          const reader = new protobufjs.Reader(data);
+          while (reader.pos < reader.len) {
+            const maybePromise = this_.receiver.receive(
+              this_.coder.decode(reader, CoderContext.needsDelimiters)
+            );
+            if (maybePromise != NonPromise) {
+              await maybePromise;
+            }
+          }
+        },
+        sendTimers: async function (timerFamilyId: string, timers: Uint8Array) {
+          throw Error("Not expecting timers.");
+        },
+        close: function () {
+          endOfDataResolve();
+        },
+        onError: function (error: Error) {
+          endOfDataReject(error);
+        },
+      }
+    );
+  }
+
+  process(wvalue: WindowedValue<unknown>): ProcessResult {
+    throw Error("Data should not come in via process.");
+  }
+
+  async finishBundle() {
+    try {
+      await this.endOfData;
+    } finally {
+      this.multiplexingDataChannel.unregisterConsumer(
+        this.getBundleId(),
+        this.transformId
+      );
+    }
+  }
+}
+
+registerOperator("beam:runner:source:v1", DataSourceOperator);
+
+class DataSinkOperator implements IOperator {
+  transformId: string;
+  getBundleId: () => string;
+  multiplexingDataChannel: MultiplexingDataChannel;
+  channel: IDataChannel;
+  coder: Coder<WindowedValue<unknown>>;
+  buffer: protobufjs.Writer;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    const writePort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+    this.multiplexingDataChannel = context.getDataChannel(
+      writePort.apiServiceDescriptor!.url
+    );
+    this.transformId = transformId;
+    this.getBundleId = context.getBundleId;
+    this.coder = context.pipelineContext.getCoder(writePort.coderId);
+  }
+
+  async startBundle() {
+    this.channel = this.multiplexingDataChannel.getSendChannel(
+      this.getBundleId(),
+      this.transformId
+    );
+    this.buffer = new protobufjs.Writer();
+  }
+
+  process(wvalue: WindowedValue<unknown>) {
+    this.coder.encode(wvalue, this.buffer, CoderContext.needsDelimiters);
+    if (this.buffer.len > 1e6) {
+      return this.flush();
+    }
+    return NonPromise;
+  }
+
+  async finishBundle() {
+    await this.flush();
+    this.channel.close();
+  }
+
+  async flush() {
+    if (this.buffer.len > 0) {
+      await this.channel.sendData(this.buffer.finish());
+      this.buffer = new protobufjs.Writer();
+    }
+  }
+}
+
+registerOperator("beam:runner:sink:v1", DataSinkOperator);
+
+class FlattenOperator implements IOperator {
+  receiver: Receiver;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    this.receiver = context.getReceiver(
+      onlyElement(Object.values(transform.outputs))
+    );
+  }
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    return this.receiver.receive(wvalue);
+  }
+
+  async finishBundle() {}
+}
+
+registerOperator("beam:transform:flatten:v1", FlattenOperator);
+
+class GenericParDoOperator implements IOperator {
+  private doFn: DoFn<unknown, unknown, unknown>;
+  private getStateProvider: () => StateProvider;
+  private sideInputInfo: Map<string, SideInputInfo> = new Map();
+  private originalContext: object | undefined;
+  private augmentedContext: object | undefined;
+  private paramProvider: ParamProviderImpl;
+
+  constructor(
+    private transformId: string,
+    private receiver: Receiver,
+    private spec: runnerApi.ParDoPayload,
+    private payload: {
+      doFn: DoFn<unknown, unknown, unknown>;
+      context: any;
+    },
+    transformProto: runnerApi.PTransform,
+    operatorContext: OperatorContext
+  ) {
+    this.doFn = payload.doFn;
+    this.originalContext = payload.context;
+    this.getStateProvider = operatorContext.getStateProvider;
+    this.sideInputInfo = createSideInputInfo(
+      transformProto,
+      spec,
+      operatorContext
+    );
+  }
+
+  async startBundle() {
+    this.paramProvider = new ParamProviderImpl(
+      this.transformId,
+      this.sideInputInfo,
+      this.getStateProvider
+    );
+    this.augmentedContext = this.paramProvider.augmentContext(
+      this.originalContext
+    );
+    if (this.doFn.startBundle) {
+      this.doFn.startBundle(this.augmentedContext);
+    }
+  }
+
+  process(wvalue: WindowedValue<unknown>) {
+    if (this.augmentedContext && wvalue.windows.length != 1) {
+      // We need to process each window separately.
+      // TODO: (Perf) We could inspect the context more deeply and allow some
+      // cases to go through.
+      const result = new ProcessResultBuilder();
+      for (const window of wvalue.windows) {
+        result.add(
+          this.process({
+            value: wvalue.value,
+            windows: [window],
+            pane: wvalue.pane,
+            timestamp: wvalue.timestamp,
+          })
+        );
+      }
+      return result.build();
+    }
+
+    const this_ = this;
+    function reallyProcess(): ProcessResult {
+      const doFnOutput = this_.doFn.process(
+        wvalue.value,
+        this_.augmentedContext
+      );
+      if (!doFnOutput) {
+        return NonPromise;
+      }
+      const result = new ProcessResultBuilder();
+      for (const element of doFnOutput) {
+        result.add(
+          this_.receiver.receive({
+            value: element,
+            windows: wvalue.windows,
+            pane: wvalue.pane,
+            timestamp: wvalue.timestamp,
+          })
+        );
+      }
+      this_.paramProvider.update(undefined);
+      return result.build();
+    }
+
+    // Update the context with any information specific to this window.
+    const updateContextResult = this.paramProvider.update(wvalue);
+
+    // If we were able to do so without any deferred actions, process the
+    // element immediately.
+    if (updateContextResult == NonPromise) {
+      return reallyProcess();
+    } else {
+      // Otherwise return a promise that first waits for all the deferred
+      // actions to complete and then process the element.
+      return (async () => {
+        await updateContextResult;
+        const update2 = this.paramProvider.update(wvalue);
+        if (update2 != NonPromise) {
+          throw new Error("Expected all promises to be resolved: " + update2);
+        }
+        await reallyProcess();
+      })();
+    }
+  }
+
+  async finishBundle() {
+    if (this.doFn.finishBundle) {
+      const finishBundleOutput = this.doFn.finishBundle(this.augmentedContext);
+      if (!finishBundleOutput) {
+        return;
+      }
+      // The finishBundle method must return `void` or a Generator<WindowedValue<OutputT>>. It may not
+      // return Generator<OutputT> without windowing information because a single bundle may contain
+      // elements from different windows, so each element must specify its window.
+      for (const element of finishBundleOutput) {
+        const maybePromise = this.receiver.receive(element);
+        if (maybePromise != NonPromise) {
+          await maybePromise;
+        }
+      }
+    }
+  }
+}
+
+class IdentityParDoOperator implements IOperator {
+  constructor(private receiver: Receiver) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    return this.receiver.receive(wvalue);
+  }
+
+  async finishBundle() {}
+}
+
+class SplittingDoFnOperator implements IOperator {
+  constructor(
+    private splitter: (any) => string,
+    private receivers: { [key: string]: Receiver }
+  ) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const tag = this.splitter(wvalue.value);
+    const receiver = this.receivers[tag];
+    if (receiver) {
+      return receiver.receive(wvalue);
+    } else {
+      // TODO: (API) Make this configurable.
+      throw new Error(
+        "Unexpected tag '" +
+          tag +
+          "' for " +
+          wvalue.value +
+          " not in " +
+          [...Object.keys(this.receivers)]
+      );
+    }
+  }
+
+  async finishBundle() {}
+}
+
+class Splitting2DoFnOperator implements IOperator {
+  constructor(private receivers: { [key: string]: Receiver }) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const result = new ProcessResultBuilder();
+    // TODO: (API) Should I exactly one instead of allowing a union?
+    for (const tag of Object.keys(wvalue.value as object)) {
+      const receiver = this.receivers[tag];
+      if (receiver) {
+        result.add(
+          receiver.receive({
+            value: (wvalue.value as object)[tag],
+            windows: wvalue.windows,
+            timestamp: wvalue.timestamp,
+            pane: wvalue.pane,
+          })
+        );
+      } else {
+        // TODO: (API) Make this configurable.
+        throw new Error(
+          "Unexpected tag '" +
+            tag +
+            "' for " +
+            wvalue.value +
+            " not in " +
+            [...Object.keys(this.receivers)]
+        );
+      }
+    }
+    return result.build();
+  }
+
+  async finishBundle() {}
+}
+
+class AssignWindowsParDoOperator implements IOperator {
+  constructor(private receiver: Receiver, private windowFn: WindowFn<Window>) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const newWindowsOnce = this.windowFn.assignWindows(wvalue.timestamp);
+    if (newWindowsOnce.length > 0) {
+      const newWindows: Window[] = [];
+      for (var i = 0; i < wvalue.windows.length; i++) {
+        newWindows.push(...newWindowsOnce);

Review Comment:
   That is correct. Added a comment to clarify.



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();
+      throw new Error(
+        "Timed out waiting for service after " + timeoutMs + "ms."
+      );
+    }
+  }
+}
+
+export function serviceProviderFromJavaGradleTarget(
+  gradleTarget: string,
+  args: string[] | undefined = undefined
+): () => Promise<JavaJarService> {
+  return async () => {
+    return new JavaJarService(
+      await JavaJarService.cachedJar(JavaJarService.gradleToJar(gradleTarget)),
+      args
+    );
+  };
+}
+
+export class JavaJarService extends SubprocessService {
+  static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2";
+  static BEAM_GROUP_ID = "org.apache.beam";
+  static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars");
+
+  constructor(jar: string, args: string[] | undefined = undefined) {
+    if (args == undefined) {
+      // TODO: (Extension) Should filesToStage be set at some higher level?
+      args = ["{{PORT}}", "--filesToStage=" + jar];
+    }
+    super("java", ["-jar", jar].concat(args));
+  }
+
+  static async cachedJar(
+    urlOrPath: string,
+    cacheDir: string = JavaJarService.JAR_CACHE
+  ): Promise<string> {
+    if (urlOrPath.match(/^https?:\/\//)) {
+      fs.mkdirSync(cacheDir, { recursive: true });
+      const dest = path.join(
+        JavaJarService.JAR_CACHE,
+        path.basename(urlOrPath)
+      );
+      if (fs.existsSync(dest)) {
+        return dest;
+      }
+      // TODO: (Cleanup) Use true temporary file.
+      const tmp = dest + ".tmp" + Math.random();
+      return new Promise((resolve, reject) => {
+        const fout = fs.createWriteStream(tmp);
+        console.log("Downloading", urlOrPath);
+        const request = https.get(urlOrPath, function (response) {
+          if (response.statusCode !== 200) {
+            reject(
+              `Error code ${response.statusCode} when downloading ${urlOrPath}`
+            );
+          }
+          response.pipe(fout);
+          fout.on("finish", function () {
+            fout.close(() => {
+              fs.renameSync(tmp, dest);
+              resolve(dest);
+            });
+          });
+        });
+      });
+    } else {
+      return urlOrPath;
+    }
+  }
+
+  static gradleToJar(
+    gradleTarget: string,
+    appendix: string | undefined = undefined,
+    version: string = beamVersion
+  ): string {
+    if (version.startsWith("0.")) {
+      // node-ts 0.x corresponds to Beam 2.x.
+      version = "2" + version.substring(1);
+    }
+    version = "2.36.0";

Review Comment:
   Yep. Leftover from debugging. Removed.



##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+//     npx tsc && npm run worker
+// From sdks/python
+//     python trivial_pipeline.py --environment_type=EXTERNAL --environment_config='localhost:5555' --runner=PortableRunner --job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnControlClient,
+  IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+  beamFnExternalWorkerPoolDefinition,
+  IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+  MultiplexingStateChannel,
+  CachingStateProvider,
+  GrpcStateProvider,
+  StateProvider,
+} from "./state";
+import {
+  IOperator,
+  Receiver,
+  createOperator,
+  OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+  controlUrl: string;
+}
+
+export class Worker {
+  controlClient: BeamFnControlClient;
+  controlChannel: grpc.ClientDuplexStream<
+    InstructionResponse,
+    InstructionRequest
+  >;
+
+  processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+  bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+  dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+  stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+  constructor(
+    private id: string,
+    private endpoints: WorkerEndpoints,
+    options: Object = {}
+  ) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", this.id);
+    this.controlClient = new BeamFnControlClient(
+      endpoints.controlUrl,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.controlChannel = this.controlClient.control(metadata);
+    this.controlChannel.on("data", async (request) => {
+      console.log(request);
+      if (request.request.oneofKind == "processBundle") {
+        await this.process(request);
+      } else {
+        console.log("Unknown instruction type: ", request);

Review Comment:
   Good call on both fronts. Done.



##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+//     npx tsc && npm run worker
+// From sdks/python
+//     python trivial_pipeline.py --environment_type=EXTERNAL --environment_config='localhost:5555' --runner=PortableRunner --job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnControlClient,
+  IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+  beamFnExternalWorkerPoolDefinition,
+  IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+  MultiplexingStateChannel,
+  CachingStateProvider,
+  GrpcStateProvider,
+  StateProvider,
+} from "./state";
+import {
+  IOperator,
+  Receiver,
+  createOperator,
+  OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+  controlUrl: string;
+}
+
+export class Worker {
+  controlClient: BeamFnControlClient;
+  controlChannel: grpc.ClientDuplexStream<
+    InstructionResponse,
+    InstructionRequest
+  >;
+
+  processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+  bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+  dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+  stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+  constructor(
+    private id: string,
+    private endpoints: WorkerEndpoints,
+    options: Object = {}
+  ) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", this.id);
+    this.controlClient = new BeamFnControlClient(
+      endpoints.controlUrl,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.controlChannel = this.controlClient.control(metadata);
+    this.controlChannel.on("data", async (request) => {
+      console.log(request);
+      if (request.request.oneofKind == "processBundle") {
+        await this.process(request);
+      } else {
+        console.log("Unknown instruction type: ", request);
+      }
+    });
+    this.controlChannel.on("end", () => {
+      console.log("Control channel closed.");
+      for (const dataChannel of this.dataChannels.values()) {
+        dataChannel.close();
+      }
+      for (const stateChannel of this.stateChannels.values()) {
+        stateChannel.close();
+      }
+    });
+  }
+
+  async wait() {
+    // TODO: Await closing of control log.
+    await new Promise((r) => setTimeout(r, 1e9));
+  }
+
+  respond(response: InstructionResponse) {
+    this.controlChannel.write(response);
+  }
+
+  async process(request) {
+    const descriptorId =
+      request.request.processBundle.processBundleDescriptorId;
+    console.log("process", request.instructionId, descriptorId);
+    try {
+      if (!this.processBundleDescriptors.has(descriptorId)) {
+        const call = this.controlClient.getProcessBundleDescriptor(
+          {
+            processBundleDescriptorId: descriptorId,
+          },
+          (err, value: ProcessBundleDescriptor) => {
+            if (err) {
+              this.respond({
+                instructionId: request.instructionId,
+                error: "" + err,
+                response: {
+                  oneofKind: "processBundle",
+                  processBundle: {
+                    residualRoots: [],
+                    monitoringInfos: [],
+                    requiresFinalization: false,
+                    monitoringData: {},
+                  },
+                },
+              });
+            } else {
+              this.processBundleDescriptors.set(descriptorId, value);
+              this.process(request);
+            }
+          }
+        );
+        return;
+      }
+
+      const processor = this.aquireBundleProcessor(descriptorId);
+      await processor.process(request.instructionId);
+      await this.respond({
+        instructionId: request.instructionId,
+        error: "",
+        response: {
+          oneofKind: "processBundle",
+          processBundle: {
+            residualRoots: [],
+            monitoringInfos: [],
+            requiresFinalization: false,
+            monitoringData: {},
+          },
+        },
+      });
+      this.returnBundleProcessor(processor);
+    } catch (error) {
+      console.error("PROCESS ERROR", error);
+      await this.respond({
+        instructionId: request.instructionId,
+        error: "" + error,
+        response: { oneofKind: undefined },
+      });
+    }
+  }
+
+  aquireBundleProcessor(descriptorId: string) {
+    if (!this.bundleProcessors.has(descriptorId)) {
+      this.bundleProcessors.set(descriptorId, []);
+    }
+    const processor = this.bundleProcessors.get(descriptorId)?.pop();
+    if (processor != undefined) {
+      return processor;
+    } else {
+      return new BundleProcessor(
+        this.processBundleDescriptors.get(descriptorId)!,
+        this.getDataChannel.bind(this),
+        this.getStateChannel.bind(this)
+      );
+    }
+  }
+
+  returnBundleProcessor(processor: BundleProcessor) {
+    this.bundleProcessors.get(processor.descriptor.id)?.push(processor);

Review Comment:
   Yes. Done.



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();
+      throw new Error(
+        "Timed out waiting for service after " + timeoutMs + "ms."
+      );
+    }
+  }
+}
+
+export function serviceProviderFromJavaGradleTarget(
+  gradleTarget: string,
+  args: string[] | undefined = undefined
+): () => Promise<JavaJarService> {
+  return async () => {
+    return new JavaJarService(
+      await JavaJarService.cachedJar(JavaJarService.gradleToJar(gradleTarget)),
+      args
+    );
+  };
+}
+
+export class JavaJarService extends SubprocessService {
+  static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2";
+  static BEAM_GROUP_ID = "org.apache.beam";
+  static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars");
+
+  constructor(jar: string, args: string[] | undefined = undefined) {
+    if (args == undefined) {
+      // TODO: (Extension) Should filesToStage be set at some higher level?
+      args = ["{{PORT}}", "--filesToStage=" + jar];
+    }
+    super("java", ["-jar", jar].concat(args));
+  }
+
+  static async cachedJar(
+    urlOrPath: string,
+    cacheDir: string = JavaJarService.JAR_CACHE
+  ): Promise<string> {
+    if (urlOrPath.match(/^https?:\/\//)) {
+      fs.mkdirSync(cacheDir, { recursive: true });
+      const dest = path.join(
+        JavaJarService.JAR_CACHE,
+        path.basename(urlOrPath)
+      );
+      if (fs.existsSync(dest)) {
+        return dest;
+      }
+      // TODO: (Cleanup) Use true temporary file.
+      const tmp = dest + ".tmp" + Math.random();
+      return new Promise((resolve, reject) => {
+        const fout = fs.createWriteStream(tmp);
+        console.log("Downloading", urlOrPath);
+        const request = https.get(urlOrPath, function (response) {
+          if (response.statusCode !== 200) {
+            reject(
+              `Error code ${response.statusCode} when downloading ${urlOrPath}`
+            );
+          }
+          response.pipe(fout);
+          fout.on("finish", function () {
+            fout.close(() => {
+              fs.renameSync(tmp, dest);
+              resolve(dest);
+            });
+          });
+        });
+      });
+    } else {
+      return urlOrPath;
+    }
+  }
+
+  static gradleToJar(
+    gradleTarget: string,
+    appendix: string | undefined = undefined,
+    version: string = beamVersion
+  ): string {
+    if (version.startsWith("0.")) {
+      // node-ts 0.x corresponds to Beam 2.x.
+      version = "2" + version.substring(1);
+    }
+    version = "2.36.0";
+    const gradlePackage = gradleTarget.match(/^:?(.*):[^:]+:?$/)![1];
+    const artifactId = "beam-" + gradlePackage.replaceAll(":", "-");
+    const projectRoot = path.resolve(
+      __dirname,
+      "..",
+      "..",
+      "..",
+      "..",
+      "..",
+      ".."
+    );

Review Comment:
   I agree. For now I added a TODO.



##########
sdks/typescript/boot.go:
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"context"
+	"flag"
+	"log"
+	"os"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+)
+
+var (
+	// Contract: https://s.apache.org/beam-fn-api-container-contract.
+
+	id                = flag.String("id", "", "Local identifier (required).")
+	loggingEndpoint   = flag.String("logging_endpoint", "", "Local logging endpoint for FnHarness (required).")
+	artifactEndpoint  = flag.String("artifact_endpoint", "", "Local artifact endpoint for FnHarness (required).")
+	provisionEndpoint = flag.String("provision_endpoint", "", "Local provision endpoint for FnHarness (required).")
+	controlEndpoint   = flag.String("control_endpoint", "", "Local control endpoint for FnHarness (required).")
+	semiPersistDir    = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional).")
+)
+
+const entrypoint = "dist/worker/worker_main.js"
+
+func main() {
+	flag.Parse()
+	if *id == "" {
+		log.Fatal("No id provided.")
+	}
+	if *provisionEndpoint == "" {
+		log.Fatal("No provision endpoint provided.")
+	}
+
+	ctx := grpcx.WriteWorkerID(context.Background(), *id)
+
+	info, err := provision.Info(ctx, *provisionEndpoint)
+	if err != nil {
+		log.Fatalf("Failed to obtain provisioning information: %v", err)
+	}
+	log.Printf("Provision info:\n%v", info)
+
+	// TODO(BEAM-8201): Simplify once flags are no longer used.

Review Comment:
   I'm not sure if all the cleanup has been done on the Dataflow side. @ihji 



##########
sdks/typescript/README.md:
##########
@@ -0,0 +1,208 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Typescript Beam SDK
+
+This is the start of a fully functioning Javascript (actually, Typescript) SDK.
+There are two distinct aims with this SDK
+
+1. Tap into the large (and relatively underserved, by existing data processing
+frameworks) community of javascript developers with a native SDK targeting this language.
+
+1. Develop a new SDK which can serve both as a proof of concept and reference
+that highlights the (relative) ease of porting Beam to new languages,
+a differentiating feature of Beam and Dataflow.
+
+To accomplish this, we lean heavily on the portability framework.
+For example, we make heavy use of cross-language transforms,
+in particular for IOs.
+In addition, the direct runner is simply an extension of the worker suitable
+for running on portable runners such as the ULR, which will directly transfer
+to running on production runners such as Dataflow and Flink.
+The target audience should hopefully not be put off by running other-language
+code encapsulated in docker images.
+
+## API
+
+We generally try to apply the concepts from the Beam API in a Typescript
+idiomatic way, but it should be noted that few of the initial developers
+have extensive (if any) Javascript/Typescript development experience, so
+feedback is greatly appreciated.
+
+In addition, some notable departures are taken from the traditional SDKs:
+
+* We take a "relational foundations" approach, where
+[schema'd data](https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit#heading=h.puuotbien1gf)
+is the primary way to interact with data, and we generally eschew the key-value
+requiring transforms in favor of a more flexible approach naming fields or
+expressions. Javascript's native Object is used as the row type.
+
+* As part of being schema-first we also de-emphasize Coders as a first-class
+concept in the SDK, relegating it to an advance feature used for interop.
+Though we can infer schemas from individual elements, it is still TBD to
+figure out if/how we can leverage the type system and/or function introspection
+to regularly infer schemas at construction time. A fallback coder using BSON
+encoding is used when we don't have sufficient type information.
+
+* We have added additional methods to the PCollection object, notably `map`
+and `flatmap`, [rather than only allowing apply](https://www.mail-archive.com/dev@beam.apache.org/msg06035.html).
+In addition, `apply` can accept a function argument `(PColletion) => ...` as
+well as a PTransform subclass, which treats this callable as if it were a
+PTransform's expand.
+
+* In the other direction, we have eliminated the
+[problematic Pipeline object](https://s.apache.org/no-beam-pipeline)
+from the API, instead providing a `Root` PValue on which pipelines are built,
+and invoking run() on a Runner.  We offer a less error-prone `Runner.run`
+which finishes only when the pipeline is completely finished as well as
+`Runner.runAsync` which returns a handle to the running pipeline.
+
+* Rather than introduce PCollectionTuple, PCollectionList, etc. we let PValue
+literally be an
+[array or object with PValue values](https://github.com/robertwb/beam-javascript/blob/de4390dd767f046903ac23fead5db333290462db/sdks/node-ts/src/apache_beam/pvalue.ts#L116)
+which transforms can consume or produce.
+These are applied by wrapping them with the `P` operator, e.g.
+`P([pc1, pc2, pc3]).apply(new Flatten())`.
+
+* Like Python, `flatMap` and `ParDo.process` return multiple elements by
+yielding them from a generator, rather than invoking a passed-in callback.
+TBD how to output to multiple distinct PCollections.
+There is currently an operation to split a PCollection into multiple
+PCollections based on the properties of the elements, and
+we may consider using a callback for side outputs.
+
+* The `map`, `flatmap`, and `ParDo.proceess` methods take an additional
+(optional) context argument, which is similar to the keyword arguments
+used in Python. These can be "ordinary" javascript objects (which are passed
+as is) or special DoFnParam objects which provide getters to element-specific
+information (such as the current timestamp, window, or side input) at runtime.
+
+* Javascript supports (and encourages) an asynchronous programing model, with
+many libraries requiring use of the async/await paradigm.
+As there is no way (by design) to go from the asyncronous style back to
+the synchronous style, this needs to be taken into account
+when designing the API.
+We currently offer asynchronous variants of `PValue.apply(...)` (in addition
+to the synchronous ones, as they are easier to chain) as well as making
+`Runner.run` asynchronous. TBD to do this for all user callbacks as well.
+
+An example pipeline can be found at https://github.com/robertwb/beam-javascript/blob/javascript/sdks/node-ts/src/apache_beam/examples/wordcount.ts
+
+## TODO
+
+This SDK is a work in progress. In January 2022 we developed the ability to
+construct and run basic pipelines (including external transforms and running
+on a portable runner) but the following big-ticket items remain.
+
+* Containerization
+
+  * Function and object serialization: we currently only support "loopback"
+  mode; to be able to run on a remote, distributed manner we need to finish up
+  the work in picking closures and DoFn objects. Some investigation has been
+  started here, but all existing libraries have non-trivial drawbacks.
+
+  * Finish the work in building a full SDK container image that starts
+  the worker.
+
+  * Actually use worker threads for multiple bundles.
+
+* API
+
+  * There are several TODOs of minor features or design decisions to finalize.
+
+    * Consider using (or supporting) 2-arrays rather than {key, value} objects
+      for KVs.
+
+    * Consider renaming map/flatMap to doMap/doFlatMap to avoid confusion with
+    Array.map that takes a key as a second callback argument.
+    Or force the second argument to be an Object, which would lead to a less
+    confusing API and clean up the implementation.
+    Also add a [do]Filter, and possibly a [do]Reduce?
+
+    * Move away from using classes.
+
+  * Add the ability to set good PTransform names, and ideally infer good
+  defaults.
+
+  * Advanced features like metrics, state, timers, and SDF.
+  Possibly some of these can wait.
+
+* Infrastructure
+
+  * Gradle and Jenkins integration for tests and style enforcement.
+
+* Other
+
+  * Enforce unique names for pipeline update.
+
+  * PipelineOptions should be a Javascript Object, not a proto Struct.
+
+  * Though Dataflow Runner v2 supports portability, submission is still done
+  via v1beta3 and interaction with GCS rather than the job submission API.
+
+  * Cleanup uses of var, this. Arrow functions. `===` vs `==`.
+
+  * Avoid `any` return types (and re-enable check in compiler).
+
+  * Relative vs. absoute imports, possibly via setting a base url with a
+  `jsconfig.json`.  Also remove imports from base.ts.
+
+  * More/better tests, including tests of illegal/unsupported use.
+
+  * Set channel options like `grpc.max_{send,receive}_message_length` as we
+  do in other SDKs.
+
+  * Reduce use of any.
+
+    * Could use `unknown` in its place where the type is truly unknown.
+
+    * It'd be nice to enforce, maybe re-enable `noImplicitAny: true` in
+    tsconfig if we can get the generated proto files to be ignored.
+
+  * Enable a linter like eslint and fix at least the low hanging fruit.
+
+There is probably more; there are many TODOs littered throughout the code.
+
+This code has also not yet been fully peer reviewed (it was the result of a
+hackathon) which needs to be done before putting it into the man repository.
+
+
+## Development.

Review Comment:
   Yes. Excellent.



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();

Review Comment:
   I agree. Done.



##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc, i) => acc + 1,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc: number, i: number) => acc + i,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a > b ? a : b)),
+  extractOutput: (acc: any) => acc,
+};
+
+export const min: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc > i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a < b ? a : b)),

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r865340404


##########
sdks/typescript/test/serialize_test.ts:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { expect } from "chai";
+import {
+  deserialize,
+  serialize,
+  BuiltinList,
+  generateDefaultBuiltins,
+} from "serialize-closures";
+
+describe("serialization tests", function () {
+  function roundtrip(value, builtins?: BuiltinList) {
+    return deserialize(
+      JSON.parse(JSON.stringify(serialize(value, builtins))),
+      builtins
+    );
+  }
+
+  function expectRoundtrip(value, builtins?: BuiltinList) {
+    expect(roundtrip(value, builtins)).to.deep.equal(value);
+  }
+
+  function* myGenerator() {
+    yield 42;
+    yield 84;
+  }
+
+  function simpleGenerator() {
+    expect(myGenerator().next()).to.equal(42);
+  }
+
+  function roundtripGeneratorConstructor() {
+    expect(roundtrip(myGenerator)().next()).to.equal(42);
+  }
+
+  function roundtripGeneratorInProgress() {

Review Comment:
   Updated this file to run tests of our actual serialization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r865314517


##########
sdks/typescript/webpack.config.js:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   SGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r865290685


##########
sdks/typescript/webpack.config.js:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   OK, I"m just going to remove it. We can always get it back from history if we need.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r855639247


##########
sdks/typescript/test/core_test.ts:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as beam from "../src/apache_beam";
+import * as assert from "assert";
+import { BytesCoder } from "../src/apache_beam/coders/standard_coders";
+import { Pipeline } from "../src/apache_beam/internal/pipeline";
+// TODO(pabloem): Fix installation.
+
+describe("core module", function () {
+  describe("runs a basic impulse expansion", function () {
+    it("runs a basic Impulse expansion", function () {
+      var p = new Pipeline();
+      var res = new beam.Root(p).apply(new beam.Impulse());
+
+      assert.equal(res.type, "pcollection");
+      assert.deepEqual(p.context.getPCollectionCoder(res), new BytesCoder());
+    });
+    it("runs a ParDo expansion", function () {
+      var p = new Pipeline();
+      var res = new beam.Root(p)
+        .apply(new beam.Impulse())
+        .map(function (v: any) {
+          return v * 2;
+        });
+    });
+    it("runs a GroupBy expansion", function () {});

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858971147


##########
sdks/typescript/package.json:
##########
@@ -0,0 +1,45 @@
+{
+  "name": "apache_beam",
+  "version": "0.37.0.dev",

Review Comment:
   ```suggestion
     "version": "0.39.0.dev",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858965465


##########
sdks/typescript/boot.go:
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"context"
+	"flag"
+	"log"
+	"os"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+)
+
+var (
+	// Contract: https://s.apache.org/beam-fn-api-container-contract.
+
+	id                = flag.String("id", "", "Local identifier (required).")
+	loggingEndpoint   = flag.String("logging_endpoint", "", "Local logging endpoint for FnHarness (required).")
+	artifactEndpoint  = flag.String("artifact_endpoint", "", "Local artifact endpoint for FnHarness (required).")
+	provisionEndpoint = flag.String("provision_endpoint", "", "Local provision endpoint for FnHarness (required).")
+	controlEndpoint   = flag.String("control_endpoint", "", "Local control endpoint for FnHarness (required).")
+	semiPersistDir    = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional).")
+)
+
+const entrypoint = "dist/worker/worker_main.js"
+
+func main() {
+	flag.Parse()
+	if *id == "" {
+		log.Fatal("No id provided.")
+	}
+	if *provisionEndpoint == "" {
+		log.Fatal("No provision endpoint provided.")
+	}
+
+	ctx := grpcx.WriteWorkerID(context.Background(), *id)
+
+	info, err := provision.Info(ctx, *provisionEndpoint)
+	if err != nil {
+		log.Fatalf("Failed to obtain provisioning information: %v", err)
+	}
+	log.Printf("Provision info:\n%v", info)
+
+	// TODO(BEAM-8201): Simplify once flags are no longer used.

Review Comment:
   Do we actually need this TODO/are we planning on doing this work? The mentioned Jira is resolved and has been for a while (https://issues.apache.org/jira/browse/BEAM-8201)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858971147


##########
sdks/typescript/package.json:
##########
@@ -0,0 +1,45 @@
+{
+  "name": "apache_beam",
+  "version": "0.37.0.dev",

Review Comment:
   ```suggestion
     "version": "0.38.0.dev",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on PR #17341:
URL: https://github.com/apache/beam/pull/17341#issuecomment-1118547622

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858805480


##########
sdks/typescript/src/apache_beam/version.ts:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const path = require("path");
+
+// TODO: (Typescript) Is there a more standard way to do this?
+export const version: string = JSON.parse(
+  fs.readFileSync(path.resolve(__dirname, "..", "..", "..", "package.json"))
+)["version"];

Review Comment:
   ```suggestion
   export const { version } = require('../../../package.json');
   ```
   
   Rather than relying on a runtime read (which doesn't get the benefits of the TS compiler checks), we can require import this.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858810856


##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc, i) => acc + 1,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc: number, i: number) => acc + i,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a > b ? a : b)),

Review Comment:
   Do we need to do an undefined check here? I think either a or b could be undefined



##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc, i) => acc + 1,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc: number, i: number) => acc + i,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a > b ? a : b)),
+  extractOutput: (acc: any) => acc,
+};
+
+export const min: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc > i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a < b ? a : b)),

Review Comment:
   Same question - do we need to handle undefined?



##########
sdks/typescript/src/apache_beam/transforms/group_and_combine.ts:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { KV } from "../values";
+import { PTransform } from "./transform";
+import { PCollection } from "../pvalue";
+import * as internal from "./internal";
+import { count } from "./combiners";
+
+// TODO: (API) Consider groupBy as a top-level method on PCollections.
+// TBD how to best express the combiners.
+//     - Idea 1: We could allow these as extra arguments to groupBy
+//     - Idea 2: We could return a special GroupedPCollection that has a nice,
+//               chain-able combining() method. We'd want the intermediates to
+//               still be usable, but lazy.
+
+export interface CombineFn<I, A, O> {
+  createAccumulator: () => A;
+  addInput: (A, I) => A;
+  mergeAccumulators: (accumulators: Iterable<A>) => A;
+  extractOutput: (A) => O;
+}
+
+// TODO: (Typescript) When typing this as ((a: I, b: I) => I), types are not inferred well.
+type Combiner<I> = CombineFn<I, any, any> | ((a: any, b: any) => any);
+
+/**
+ * A PTransform that takes a PCollection of elements, and returns a PCollection
+ * of elements grouped by a field, multiple fields, an expression that is used
+ * as the grouping key.
+ *
+ * @extends PTransform
+ */
+export class GroupBy<T, K> extends PTransform<
+  PCollection<T>,
+  PCollection<KV<K, Iterable<T>>>
+> {
+  keyFn: (element: T) => K;
+  keyNames: string | string[];
+  keyName: string;
+
+  /**
+   * Create a GroupBy transform.
+   *
+   * @param key: The name of the key in the JSON object, or a function that returns the key for a given element.
+   */
+  constructor(
+    key: string | string[] | ((element: T) => K),
+    keyName: string | undefined = undefined
+  ) {
+    super();
+    [this.keyFn, this.keyNames] = extractFnAndName(key, keyName || "key");
+    this.keyName = typeof this.keyNames == "string" ? this.keyNames : "key";
+  }
+
+  expand(input: PCollection<T>): PCollection<KV<K, Iterable<T>>> {
+    const keyFn = this.keyFn;
+    return input
+      .map((x) => ({ key: keyFn(x), value: x }))
+      .apply(new internal.GroupByKey());
+  }
+
+  combining<I>(
+    expr: string | ((element: T) => I),
+    combiner: Combiner<I>,
+    resultName: string
+  ) {
+    return new GroupByAndCombine(this.keyFn, this.keyNames, []).combining(
+      expr,
+      combiner,
+      resultName
+    );
+  }
+}
+
+/**
+ * Groups all elements of the input PCollection together.
+ *
+ * This is generally used with one or more combining specifications, as one
+ * looses parallelization benefits in bringing all elements of a distributed

Review Comment:
   ```suggestion
    * loses parallelization benefits in bringing all elements of a distributed
   ```
   
   Nit



##########
sdks/typescript/src/apache_beam/transforms/transform.ts:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import { PValue } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+
+export function withName<T>(name: string | (() => string), arg: T): T {
+  (arg as any).beamName = name;
+  return arg;
+}
+
+export function extractName<T>(withName: T): string {
+  const untyped = withName as any;
+  if (untyped.beamName != undefined) {
+    if (typeof untyped.beamName == "string") {
+      return untyped.beamName;
+    } else {
+      return untyped.beamName();
+    }
+  } else if (
+    untyped.name != undefined &&
+    untyped.name &&

Review Comment:
   ```suggestion
       untyped.name &&
   ```
   
   The first check here is superfluous since if `untyped.name` is undefined, `untyped.name` will evaluate to false



##########
sdks/typescript/src/apache_beam/transforms/window.ts:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import * as urns from "../internal/urns";
+
+import { PTransform } from "./transform";
+import { Coder } from "../coders/coders";
+import { Window } from "../values";
+import { PCollection } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+import { ParDo } from "./pardo";
+import { serializeFn } from "../internal/serialize";
+
+export interface WindowFn<W extends Window> {
+  assignWindows: (Instant) => W[];
+  windowCoder: () => Coder<W>;
+  toProto: () => runnerApi.FunctionSpec;
+  isMerging: () => boolean;
+  assignsToOneWindow: () => boolean;
+}
+
+export class WindowInto<T, W extends Window> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  static createWindowingStrategy(
+    pipeline: Pipeline,
+    windowFn: WindowFn<any>,
+    windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+  ): runnerApi.WindowingStrategy {
+    let result: runnerApi.WindowingStrategy;
+    if (windowingStrategyBase == undefined) {
+      result = {
+        windowFn: undefined!,
+        windowCoderId: undefined!,
+        mergeStatus: undefined!,
+        assignsToOneWindow: undefined!,
+        trigger: { trigger: { oneofKind: "default", default: {} } },
+        accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
+        outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
+        closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
+        onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
+        allowedLateness: BigInt(0),
+        environmentId: pipeline.defaultEnvironment,
+      };
+    } else {
+      result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
+    }
+    result.windowFn = windowFn.toProto();
+    result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
+    result.mergeStatus = windowFn.isMerging()
+      ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
+      : runnerApi.MergeStatus_Enum.NON_MERGING;
+    result.assignsToOneWindow = windowFn.assignsToOneWindow();
+    return result;
+  }
+
+  constructor(
+    private windowFn: WindowFn<W>,
+    private windowingStrategyBase:
+      | runnerApi.WindowingStrategy
+      | undefined = undefined
+  ) {
+    super("WindowInto(" + windowFn + ", " + windowingStrategyBase + ")");
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_WINDOW_INTO_DOFN_URN,
+            payload: serializeFn({ windowFn: this.windowFn }),
+          }),
+        })
+      ),
+    });
+
+    const inputCoder = pipeline.context.getPCollectionCoderId(input);
+    return pipeline.createPCollectionInternal<T>(
+      inputCoder,
+      WindowInto.createWindowingStrategy(
+        pipeline,
+        this.windowFn,
+        this.windowingStrategyBase
+      )
+    );
+  }
+}
+
+// TODO: (Cleanup) Add restrictions on moving backwards?
+export class AssignTimestamps<T> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  constructor(private func: (T, Instant) => typeof Instant) {
+    super();
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
+            payload: serializeFn({ func: this.func }),
+          }),
+        })
+      ),
+    });
+

Review Comment:
   This code is duplicated a number of times, with the only difference being the urn - consider pulling it out into its own function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r862252768


##########
sdks/typescript/boot.go:
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"context"
+	"flag"
+	"log"
+	"os"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+)
+
+var (
+	// Contract: https://s.apache.org/beam-fn-api-container-contract.
+
+	id                = flag.String("id", "", "Local identifier (required).")
+	loggingEndpoint   = flag.String("logging_endpoint", "", "Local logging endpoint for FnHarness (required).")
+	artifactEndpoint  = flag.String("artifact_endpoint", "", "Local artifact endpoint for FnHarness (required).")
+	provisionEndpoint = flag.String("provision_endpoint", "", "Local provision endpoint for FnHarness (required).")
+	controlEndpoint   = flag.String("control_endpoint", "", "Local control endpoint for FnHarness (required).")
+	semiPersistDir    = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional).")
+)
+
+const entrypoint = "dist/worker/worker_main.js"
+
+func main() {
+	flag.Parse()
+	if *id == "" {
+		log.Fatal("No id provided.")
+	}
+	if *provisionEndpoint == "" {
+		log.Fatal("No provision endpoint provided.")
+	}
+
+	ctx := grpcx.WriteWorkerID(context.Background(), *id)
+
+	info, err := provision.Info(ctx, *provisionEndpoint)
+	if err != nil {
+		log.Fatalf("Failed to obtain provisioning information: %v", err)
+	}
+	log.Printf("Provision info:\n%v", info)
+
+	// TODO(BEAM-8201): Simplify once flags are no longer used.

Review Comment:
   Dataflow provisioning service doesn't provide these endpoints information yet. They're still passed by arguments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on PR #17341:
URL: https://github.com/apache/beam/pull/17341#issuecomment-1118547343

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #17341:
URL: https://github.com/apache/beam/pull/17341#issuecomment-1100302430

   # [Codecov](https://codecov.io/gh/apache/beam/pull/17341?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#17341](https://codecov.io/gh/apache/beam/pull/17341?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (23cc336) into [master](https://codecov.io/gh/apache/beam/commit/ddd95c53738133fbb314cf9ba0ddd457774cfe28?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ddd95c5) will **decrease** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #17341      +/-   ##
   ==========================================
   - Coverage   73.77%   73.76%   -0.01%     
   ==========================================
     Files         686      686              
     Lines       90125    90125              
   ==========================================
   - Hits        66486    66479       -7     
   - Misses      22469    22476       +7     
     Partials     1170     1170              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.67% <ø> (-0.02%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/17341?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/17341/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-1.99%)` | :arrow_down: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/17341/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `96.72% <0.00%> (-1.64%)` | :arrow_down: |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/17341/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `93.00% <0.00%> (+1.00%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/17341?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/17341?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [ddd95c5...23cc336](https://codecov.io/gh/apache/beam/pull/17341?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858885358


##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(

Review Comment:
   ```suggestion
       this.process = childProcess.spawnSync(
   ```
   
   Nit: Not really _necessary_, but I expect this will give us a better exception if there are issues with spawning the process itself



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();
+      throw new Error(
+        "Timed out waiting for service after " + timeoutMs + "ms."
+      );
+    }
+  }
+}
+
+export function serviceProviderFromJavaGradleTarget(
+  gradleTarget: string,
+  args: string[] | undefined = undefined
+): () => Promise<JavaJarService> {
+  return async () => {
+    return new JavaJarService(
+      await JavaJarService.cachedJar(JavaJarService.gradleToJar(gradleTarget)),
+      args
+    );
+  };
+}
+
+export class JavaJarService extends SubprocessService {
+  static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2";
+  static BEAM_GROUP_ID = "org.apache.beam";
+  static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars");
+
+  constructor(jar: string, args: string[] | undefined = undefined) {
+    if (args == undefined) {
+      // TODO: (Extension) Should filesToStage be set at some higher level?
+      args = ["{{PORT}}", "--filesToStage=" + jar];
+    }
+    super("java", ["-jar", jar].concat(args));
+  }
+
+  static async cachedJar(
+    urlOrPath: string,
+    cacheDir: string = JavaJarService.JAR_CACHE
+  ): Promise<string> {
+    if (urlOrPath.match(/^https?:\/\//)) {
+      fs.mkdirSync(cacheDir, { recursive: true });
+      const dest = path.join(
+        JavaJarService.JAR_CACHE,
+        path.basename(urlOrPath)
+      );
+      if (fs.existsSync(dest)) {
+        return dest;
+      }
+      // TODO: (Cleanup) Use true temporary file.
+      const tmp = dest + ".tmp" + Math.random();
+      return new Promise((resolve, reject) => {
+        const fout = fs.createWriteStream(tmp);
+        console.log("Downloading", urlOrPath);
+        const request = https.get(urlOrPath, function (response) {
+          if (response.statusCode !== 200) {
+            reject(
+              `Error code ${response.statusCode} when downloading ${urlOrPath}`
+            );
+          }
+          response.pipe(fout);
+          fout.on("finish", function () {
+            fout.close(() => {
+              fs.renameSync(tmp, dest);
+              resolve(dest);
+            });
+          });
+        });
+      });
+    } else {
+      return urlOrPath;
+    }
+  }
+
+  static gradleToJar(
+    gradleTarget: string,
+    appendix: string | undefined = undefined,
+    version: string = beamVersion
+  ): string {
+    if (version.startsWith("0.")) {
+      // node-ts 0.x corresponds to Beam 2.x.
+      version = "2" + version.substring(1);
+    }
+    version = "2.36.0";

Review Comment:
   I don't think we want this line here anymore



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();
+      throw new Error(
+        "Timed out waiting for service after " + timeoutMs + "ms."
+      );
+    }
+  }
+}
+
+export function serviceProviderFromJavaGradleTarget(
+  gradleTarget: string,
+  args: string[] | undefined = undefined
+): () => Promise<JavaJarService> {
+  return async () => {
+    return new JavaJarService(
+      await JavaJarService.cachedJar(JavaJarService.gradleToJar(gradleTarget)),
+      args
+    );
+  };
+}
+
+export class JavaJarService extends SubprocessService {
+  static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2";
+  static BEAM_GROUP_ID = "org.apache.beam";
+  static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars");
+
+  constructor(jar: string, args: string[] | undefined = undefined) {
+    if (args == undefined) {
+      // TODO: (Extension) Should filesToStage be set at some higher level?
+      args = ["{{PORT}}", "--filesToStage=" + jar];
+    }
+    super("java", ["-jar", jar].concat(args));
+  }
+
+  static async cachedJar(
+    urlOrPath: string,
+    cacheDir: string = JavaJarService.JAR_CACHE
+  ): Promise<string> {
+    if (urlOrPath.match(/^https?:\/\//)) {
+      fs.mkdirSync(cacheDir, { recursive: true });
+      const dest = path.join(
+        JavaJarService.JAR_CACHE,
+        path.basename(urlOrPath)
+      );
+      if (fs.existsSync(dest)) {
+        return dest;
+      }
+      // TODO: (Cleanup) Use true temporary file.
+      const tmp = dest + ".tmp" + Math.random();
+      return new Promise((resolve, reject) => {
+        const fout = fs.createWriteStream(tmp);
+        console.log("Downloading", urlOrPath);
+        const request = https.get(urlOrPath, function (response) {
+          if (response.statusCode !== 200) {
+            reject(
+              `Error code ${response.statusCode} when downloading ${urlOrPath}`
+            );
+          }
+          response.pipe(fout);
+          fout.on("finish", function () {
+            fout.close(() => {
+              fs.renameSync(tmp, dest);
+              resolve(dest);
+            });
+          });
+        });
+      });
+    } else {
+      return urlOrPath;
+    }
+  }
+
+  static gradleToJar(
+    gradleTarget: string,
+    appendix: string | undefined = undefined,
+    version: string = beamVersion
+  ): string {
+    if (version.startsWith("0.")) {
+      // node-ts 0.x corresponds to Beam 2.x.
+      version = "2" + version.substring(1);
+    }
+    version = "2.36.0";
+    const gradlePackage = gradleTarget.match(/^:?(.*):[^:]+:?$/)![1];
+    const artifactId = "beam-" + gradlePackage.replaceAll(":", "-");
+    const projectRoot = path.resolve(
+      __dirname,
+      "..",
+      "..",
+      "..",
+      "..",
+      "..",
+      ".."
+    );

Review Comment:
   This is pretty fragile and likely to break if we restructure our directories or even if we restructure where the JavaScript is getting built to. Could we refactor it out into a shared constants class? It would be even better if we put some test in place to guarantee that it correctly points to the project root. (e.g. its able to find the `.github` directory, or something else that can't move). Then if it breaks, it will at least be obvious



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();

Review Comment:
   I think it makes more sense to let the caller do this (and the caller already is). Regardless, we probably don't need it in both places



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858663543


##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+//     npx tsc && npm run worker
+// From sdks/python
+//     python trivial_pipeline.py --environment_type=EXTERNAL --environment_config='localhost:5555' --runner=PortableRunner --job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnControlClient,
+  IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+  beamFnExternalWorkerPoolDefinition,
+  IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+  MultiplexingStateChannel,
+  CachingStateProvider,
+  GrpcStateProvider,
+  StateProvider,
+} from "./state";
+import {
+  IOperator,
+  Receiver,
+  createOperator,
+  OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+  controlUrl: string;
+}
+
+export class Worker {
+  controlClient: BeamFnControlClient;
+  controlChannel: grpc.ClientDuplexStream<
+    InstructionResponse,
+    InstructionRequest
+  >;
+
+  processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+  bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+  dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+  stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+  constructor(
+    private id: string,
+    private endpoints: WorkerEndpoints,
+    options: Object = {}
+  ) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", this.id);
+    this.controlClient = new BeamFnControlClient(
+      endpoints.controlUrl,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.controlChannel = this.controlClient.control(metadata);
+    this.controlChannel.on("data", async (request) => {
+      console.log(request);
+      if (request.request.oneofKind == "processBundle") {
+        await this.process(request);
+      } else {
+        console.log("Unknown instruction type: ", request);

Review Comment:
   Instead of just logging, should we be returning an error response here?
   
   Structurally, it probably will be cleaner to move this if/else into the process function as well, that way as we add more options it doesn't end up bloating this block.



##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+//     npx tsc && npm run worker
+// From sdks/python
+//     python trivial_pipeline.py --environment_type=EXTERNAL --environment_config='localhost:5555' --runner=PortableRunner --job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnControlClient,
+  IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+  beamFnExternalWorkerPoolDefinition,
+  IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+  MultiplexingStateChannel,
+  CachingStateProvider,
+  GrpcStateProvider,
+  StateProvider,
+} from "./state";
+import {
+  IOperator,
+  Receiver,
+  createOperator,
+  OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+  controlUrl: string;
+}
+
+export class Worker {
+  controlClient: BeamFnControlClient;
+  controlChannel: grpc.ClientDuplexStream<
+    InstructionResponse,
+    InstructionRequest
+  >;
+
+  processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+  bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+  dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+  stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+  constructor(
+    private id: string,
+    private endpoints: WorkerEndpoints,
+    options: Object = {}
+  ) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", this.id);
+    this.controlClient = new BeamFnControlClient(
+      endpoints.controlUrl,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.controlChannel = this.controlClient.control(metadata);
+    this.controlChannel.on("data", async (request) => {
+      console.log(request);
+      if (request.request.oneofKind == "processBundle") {
+        await this.process(request);
+      } else {
+        console.log("Unknown instruction type: ", request);
+      }
+    });
+    this.controlChannel.on("end", () => {
+      console.log("Control channel closed.");
+      for (const dataChannel of this.dataChannels.values()) {
+        dataChannel.close();
+      }
+      for (const stateChannel of this.stateChannels.values()) {
+        stateChannel.close();
+      }
+    });
+  }
+
+  async wait() {
+    // TODO: Await closing of control log.
+    await new Promise((r) => setTimeout(r, 1e9));
+  }
+
+  respond(response: InstructionResponse) {
+    this.controlChannel.write(response);
+  }
+
+  async process(request) {
+    const descriptorId =
+      request.request.processBundle.processBundleDescriptorId;
+    console.log("process", request.instructionId, descriptorId);
+    try {
+      if (!this.processBundleDescriptors.has(descriptorId)) {
+        const call = this.controlClient.getProcessBundleDescriptor(
+          {
+            processBundleDescriptorId: descriptorId,
+          },
+          (err, value: ProcessBundleDescriptor) => {
+            if (err) {
+              this.respond({
+                instructionId: request.instructionId,
+                error: "" + err,
+                response: {
+                  oneofKind: "processBundle",
+                  processBundle: {
+                    residualRoots: [],
+                    monitoringInfos: [],
+                    requiresFinalization: false,
+                    monitoringData: {},
+                  },
+                },
+              });
+            } else {
+              this.processBundleDescriptors.set(descriptorId, value);
+              this.process(request);
+            }
+          }
+        );
+        return;
+      }
+
+      const processor = this.aquireBundleProcessor(descriptorId);
+      await processor.process(request.instructionId);
+      await this.respond({
+        instructionId: request.instructionId,
+        error: "",
+        response: {
+          oneofKind: "processBundle",
+          processBundle: {
+            residualRoots: [],
+            monitoringInfos: [],
+            requiresFinalization: false,
+            monitoringData: {},
+          },
+        },
+      });
+      this.returnBundleProcessor(processor);
+    } catch (error) {
+      console.error("PROCESS ERROR", error);
+      await this.respond({
+        instructionId: request.instructionId,
+        error: "" + error,
+        response: { oneofKind: undefined },
+      });
+    }
+  }
+
+  aquireBundleProcessor(descriptorId: string) {
+    if (!this.bundleProcessors.has(descriptorId)) {
+      this.bundleProcessors.set(descriptorId, []);
+    }
+    const processor = this.bundleProcessors.get(descriptorId)?.pop();
+    if (processor != undefined) {
+      return processor;
+    } else {
+      return new BundleProcessor(
+        this.processBundleDescriptors.get(descriptorId)!,
+        this.getDataChannel.bind(this),
+        this.getStateChannel.bind(this)
+      );
+    }
+  }
+
+  returnBundleProcessor(processor: BundleProcessor) {
+    this.bundleProcessors.get(processor.descriptor.id)?.push(processor);

Review Comment:
   If this `this.bundleProcessors.get(processor.descriptor.id)?` returns null/undefined, should we create it?



##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnDataClient,
+  IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+  dataClient: BeamFnDataClient;
+  dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+  consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+  constructor(endpoint: string, workerId: string) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", workerId);
+    this.dataClient = new BeamFnDataClient(
+      endpoint,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.dataChannel = this.dataClient.data(metadata);
+    this.dataChannel.on("data", async (elements) => {
+      console.log("data", elements);
+      for (const data of elements.data) {
+        const consumer = this.getConsumer(data.instructionId, data.transformId);
+        try {
+          await consumer.sendData(data.data);
+          if (data.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+      for (const timers of elements.timers) {
+        const consumer = this.getConsumer(
+          timers.instructionId,
+          timers.transformId
+        );
+        try {
+          await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+          if (timers.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+    });
+  }
+
+  close() {
+    this.dataChannel.end();
+  }
+
+  async registerConsumer(
+    bundleId: string,
+    transformId: string,
+    consumer: IDataChannel
+  ) {
+    consumer = truncateOnErrorDataChannel(consumer);
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (this.consumers.get(bundleId)!.has(transformId)) {

Review Comment:
   ```suggestion
       } else if (this.consumers.get(bundleId)!.has(transformId)) {
   ```
   
   Nit: we can save a check here



##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);

Review Comment:
   ```suggestion
       throw new Error("Unknown transform type:" + transform.spec!.urn);
   ```
   
   This will throw 2 lines earlier if spec is null/undefined



##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);
+  }
+  return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+  transformId: string,
+  transformProto: PTransform,
+  context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+  new (
+    transformId: string,
+    transformProto: PTransform,
+    context: OperatorContext
+  ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {
+  registerOperatorConstructor(urn, (transformId, transformProto, context) => {
+    return new cls(transformId, transformProto, context);
+  });
+}
+
+export function registerOperatorConstructor(
+  urn: string,
+  constructor: OperatorConstructor
+) {
+  operatorsByUrn.set(urn, constructor);
+}
+
+////////// Actual operator implementation. //////////
+
+// NOTE: It may have been more idiomatic to use objects in closures satisfying
+// the IOperator interface here, but classes are used to make a clearer pattern
+// potential SDK authors that are less familiar with javascript.
+
+class DataSourceOperator implements IOperator {
+  transformId: string;
+  getBundleId: () => string;
+  multiplexingDataChannel: MultiplexingDataChannel;
+  receiver: Receiver;
+  coder: Coder<WindowedValue<unknown>>;
+  endOfData: Promise<void>;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    const readPort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+    this.multiplexingDataChannel = context.getDataChannel(
+      readPort.apiServiceDescriptor!.url
+    );
+    this.transformId = transformId;
+    this.getBundleId = context.getBundleId;
+    this.receiver = context.getReceiver(
+      onlyElement(Object.values(transform.outputs))
+    );
+    this.coder = context.pipelineContext.getCoder(readPort.coderId);
+  }
+
+  async startBundle() {
+    const this_ = this;
+    var endOfDataResolve, endOfDataReject;
+    this.endOfData = new Promise(async (resolve, reject) => {
+      endOfDataResolve = resolve;
+      endOfDataReject = reject;
+    });
+
+    await this_.multiplexingDataChannel.registerConsumer(
+      this_.getBundleId(),
+      this_.transformId,
+      {
+        sendData: async function (data: Uint8Array) {
+          console.log("Got", data);
+          const reader = new protobufjs.Reader(data);
+          while (reader.pos < reader.len) {
+            const maybePromise = this_.receiver.receive(
+              this_.coder.decode(reader, CoderContext.needsDelimiters)
+            );
+            if (maybePromise != NonPromise) {
+              await maybePromise;
+            }
+          }
+        },
+        sendTimers: async function (timerFamilyId: string, timers: Uint8Array) {
+          throw Error("Not expecting timers.");
+        },
+        close: function () {
+          endOfDataResolve();
+        },
+        onError: function (error: Error) {
+          endOfDataReject(error);
+        },
+      }
+    );
+  }
+
+  process(wvalue: WindowedValue<unknown>): ProcessResult {
+    throw Error("Data should not come in via process.");
+  }
+
+  async finishBundle() {
+    try {
+      await this.endOfData;
+    } finally {
+      this.multiplexingDataChannel.unregisterConsumer(
+        this.getBundleId(),
+        this.transformId
+      );
+    }
+  }
+}
+
+registerOperator("beam:runner:source:v1", DataSourceOperator);
+
+class DataSinkOperator implements IOperator {
+  transformId: string;
+  getBundleId: () => string;
+  multiplexingDataChannel: MultiplexingDataChannel;
+  channel: IDataChannel;
+  coder: Coder<WindowedValue<unknown>>;
+  buffer: protobufjs.Writer;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    const writePort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+    this.multiplexingDataChannel = context.getDataChannel(
+      writePort.apiServiceDescriptor!.url
+    );
+    this.transformId = transformId;
+    this.getBundleId = context.getBundleId;
+    this.coder = context.pipelineContext.getCoder(writePort.coderId);
+  }
+
+  async startBundle() {
+    this.channel = this.multiplexingDataChannel.getSendChannel(
+      this.getBundleId(),
+      this.transformId
+    );
+    this.buffer = new protobufjs.Writer();
+  }
+
+  process(wvalue: WindowedValue<unknown>) {
+    this.coder.encode(wvalue, this.buffer, CoderContext.needsDelimiters);
+    if (this.buffer.len > 1e6) {
+      return this.flush();
+    }
+    return NonPromise;
+  }
+
+  async finishBundle() {
+    await this.flush();
+    this.channel.close();
+  }
+
+  async flush() {
+    if (this.buffer.len > 0) {
+      await this.channel.sendData(this.buffer.finish());
+      this.buffer = new protobufjs.Writer();
+    }
+  }
+}
+
+registerOperator("beam:runner:sink:v1", DataSinkOperator);
+
+class FlattenOperator implements IOperator {
+  receiver: Receiver;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    this.receiver = context.getReceiver(
+      onlyElement(Object.values(transform.outputs))
+    );
+  }
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    return this.receiver.receive(wvalue);
+  }
+
+  async finishBundle() {}
+}
+
+registerOperator("beam:transform:flatten:v1", FlattenOperator);
+
+class GenericParDoOperator implements IOperator {
+  private doFn: DoFn<unknown, unknown, unknown>;
+  private getStateProvider: () => StateProvider;
+  private sideInputInfo: Map<string, SideInputInfo> = new Map();
+  private originalContext: object | undefined;
+  private augmentedContext: object | undefined;
+  private paramProvider: ParamProviderImpl;
+
+  constructor(
+    private transformId: string,
+    private receiver: Receiver,
+    private spec: runnerApi.ParDoPayload,
+    private payload: {
+      doFn: DoFn<unknown, unknown, unknown>;
+      context: any;
+    },
+    transformProto: runnerApi.PTransform,
+    operatorContext: OperatorContext
+  ) {
+    this.doFn = payload.doFn;
+    this.originalContext = payload.context;
+    this.getStateProvider = operatorContext.getStateProvider;
+    this.sideInputInfo = createSideInputInfo(
+      transformProto,
+      spec,
+      operatorContext
+    );
+  }
+
+  async startBundle() {
+    this.paramProvider = new ParamProviderImpl(
+      this.transformId,
+      this.sideInputInfo,
+      this.getStateProvider
+    );
+    this.augmentedContext = this.paramProvider.augmentContext(
+      this.originalContext
+    );
+    if (this.doFn.startBundle) {
+      this.doFn.startBundle(this.augmentedContext);
+    }
+  }
+
+  process(wvalue: WindowedValue<unknown>) {
+    if (this.augmentedContext && wvalue.windows.length != 1) {
+      // We need to process each window separately.
+      // TODO: (Perf) We could inspect the context more deeply and allow some
+      // cases to go through.
+      const result = new ProcessResultBuilder();
+      for (const window of wvalue.windows) {
+        result.add(
+          this.process({
+            value: wvalue.value,
+            windows: [window],
+            pane: wvalue.pane,
+            timestamp: wvalue.timestamp,
+          })
+        );
+      }
+      return result.build();
+    }
+
+    const this_ = this;
+    function reallyProcess(): ProcessResult {
+      const doFnOutput = this_.doFn.process(
+        wvalue.value,
+        this_.augmentedContext
+      );
+      if (!doFnOutput) {
+        return NonPromise;
+      }
+      const result = new ProcessResultBuilder();
+      for (const element of doFnOutput) {
+        result.add(
+          this_.receiver.receive({
+            value: element,
+            windows: wvalue.windows,
+            pane: wvalue.pane,
+            timestamp: wvalue.timestamp,
+          })
+        );
+      }
+      this_.paramProvider.update(undefined);
+      return result.build();
+    }
+
+    // Update the context with any information specific to this window.
+    const updateContextResult = this.paramProvider.update(wvalue);
+
+    // If we were able to do so without any deferred actions, process the
+    // element immediately.
+    if (updateContextResult == NonPromise) {
+      return reallyProcess();
+    } else {
+      // Otherwise return a promise that first waits for all the deferred
+      // actions to complete and then process the element.
+      return (async () => {
+        await updateContextResult;
+        const update2 = this.paramProvider.update(wvalue);
+        if (update2 != NonPromise) {
+          throw new Error("Expected all promises to be resolved: " + update2);
+        }
+        await reallyProcess();
+      })();
+    }
+  }
+
+  async finishBundle() {
+    if (this.doFn.finishBundle) {
+      const finishBundleOutput = this.doFn.finishBundle(this.augmentedContext);
+      if (!finishBundleOutput) {
+        return;
+      }
+      // The finishBundle method must return `void` or a Generator<WindowedValue<OutputT>>. It may not
+      // return Generator<OutputT> without windowing information because a single bundle may contain
+      // elements from different windows, so each element must specify its window.
+      for (const element of finishBundleOutput) {
+        const maybePromise = this.receiver.receive(element);
+        if (maybePromise != NonPromise) {
+          await maybePromise;
+        }
+      }
+    }
+  }
+}
+
+class IdentityParDoOperator implements IOperator {
+  constructor(private receiver: Receiver) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    return this.receiver.receive(wvalue);
+  }
+
+  async finishBundle() {}
+}
+
+class SplittingDoFnOperator implements IOperator {
+  constructor(
+    private splitter: (any) => string,
+    private receivers: { [key: string]: Receiver }
+  ) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const tag = this.splitter(wvalue.value);
+    const receiver = this.receivers[tag];
+    if (receiver) {
+      return receiver.receive(wvalue);
+    } else {
+      // TODO: (API) Make this configurable.
+      throw new Error(
+        "Unexpected tag '" +
+          tag +
+          "' for " +
+          wvalue.value +
+          " not in " +
+          [...Object.keys(this.receivers)]
+      );
+    }
+  }
+
+  async finishBundle() {}
+}
+
+class Splitting2DoFnOperator implements IOperator {
+  constructor(private receivers: { [key: string]: Receiver }) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const result = new ProcessResultBuilder();
+    // TODO: (API) Should I exactly one instead of allowing a union?
+    for (const tag of Object.keys(wvalue.value as object)) {
+      const receiver = this.receivers[tag];
+      if (receiver) {
+        result.add(
+          receiver.receive({
+            value: (wvalue.value as object)[tag],
+            windows: wvalue.windows,
+            timestamp: wvalue.timestamp,
+            pane: wvalue.pane,
+          })
+        );
+      } else {
+        // TODO: (API) Make this configurable.
+        throw new Error(
+          "Unexpected tag '" +
+            tag +
+            "' for " +
+            wvalue.value +
+            " not in " +
+            [...Object.keys(this.receivers)]
+        );
+      }
+    }
+    return result.build();
+  }
+
+  async finishBundle() {}
+}
+
+class AssignWindowsParDoOperator implements IOperator {
+  constructor(private receiver: Receiver, private windowFn: WindowFn<Window>) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const newWindowsOnce = this.windowFn.assignWindows(wvalue.timestamp);
+    if (newWindowsOnce.length > 0) {
+      const newWindows: Window[] = [];
+      for (var i = 0; i < wvalue.windows.length; i++) {
+        newWindows.push(...newWindowsOnce);

Review Comment:
   Could someone help me understand what this block is doing? It seems kinda odd to me - is the goal to end up with newWindowsOnce repeated `wvalue.windows.length` times, and if so - why?



##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);
+  }
+  return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+  transformId: string,
+  transformProto: PTransform,
+  context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+  new (
+    transformId: string,
+    transformProto: PTransform,
+    context: OperatorContext
+  ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {

Review Comment:
   ```suggestion
   export function registerOperator(urn: string, cls: OperatorClass): IOperator {
   ```



##########
sdks/typescript/src/apache_beam/worker/state.ts:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import * as fnApi from "../proto/beam_fn_api";
+import { BeamFnStateClient } from "../proto/beam_fn_api.grpc-client";
+
+// TODO: (Extension) Lazy iteration via continuation tokens.
+// This will likely require promises all the way up to the consumer.
+
+interface PromiseWrapper<T> {
+  type: "promise";
+  promise: Promise<T>;
+}
+
+interface ValueWrapper<T> {
+  type: "value";
+  value: T;
+}
+
+// We want to avoid promises when not needed (e.g. for a cache hit) as they
+// have to bubble all the way up the stack.
+export type MaybePromise<T> = PromiseWrapper<T> | ValueWrapper<T>;
+
+export interface StateProvider {
+  getState: <T>(
+    stateKey: fnApi.StateKey,
+    decode: (data: Uint8Array) => T
+  ) => MaybePromise<T>;
+}
+
+// TODO: (Advanced) Cross-bundle caching.
+export class CachingStateProvider implements StateProvider {
+  underlying: StateProvider;
+  cache: Map<string, MaybePromise<any>> = new Map();
+
+  constructor(underlying: StateProvider) {
+    this.underlying = underlying;
+  }
+
+  getState<T>(stateKey: fnApi.StateKey, decode: (data: Uint8Array) => T) {
+    // TODO: (Perf) Consider caching on something ligher-weight than the full
+    // serialized key, only constructing this proto when interacting with
+    // the runner.
+    const cacheKey = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+      "base64"
+    );
+    if (this.cache.has(cacheKey)) {
+      return this.cache.get(cacheKey)!;
+    } else {

Review Comment:
   Nit: It is probably cleaner to drop this else and unindent this block since we're early returning in the if



##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnDataClient,
+  IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+  dataClient: BeamFnDataClient;
+  dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+  consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+  constructor(endpoint: string, workerId: string) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", workerId);
+    this.dataClient = new BeamFnDataClient(
+      endpoint,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.dataChannel = this.dataClient.data(metadata);
+    this.dataChannel.on("data", async (elements) => {
+      console.log("data", elements);
+      for (const data of elements.data) {
+        const consumer = this.getConsumer(data.instructionId, data.transformId);
+        try {
+          await consumer.sendData(data.data);
+          if (data.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+      for (const timers of elements.timers) {
+        const consumer = this.getConsumer(
+          timers.instructionId,
+          timers.transformId
+        );
+        try {
+          await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+          if (timers.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+    });
+  }
+
+  close() {
+    this.dataChannel.end();
+  }
+
+  async registerConsumer(
+    bundleId: string,
+    transformId: string,
+    consumer: IDataChannel
+  ) {
+    consumer = truncateOnErrorDataChannel(consumer);
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (this.consumers.get(bundleId)!.has(transformId)) {
+      await (
+        this.consumers.get(bundleId)!.get(transformId) as BufferingDataChannel
+      ).flush(consumer);
+    }
+    this.consumers.get(bundleId)!.set(transformId, consumer);
+  }
+
+  unregisterConsumer(bundleId: string, transformId: string) {
+    this.consumers.get(bundleId)!.delete(transformId);
+  }
+
+  getConsumer(bundleId: string, transformId: string): IDataChannel {
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (!this.consumers.get(bundleId)!.has(transformId)) {

Review Comment:
   Nit:
   
   ```suggestion
       } else if (!this.consumers.get(bundleId)!.has(transformId)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858955161


##########
sdks/typescript/src/apache_beam/testing/assert.ts:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as beam from "../../apache_beam";
+import { GlobalWindows } from "../../apache_beam/transforms/windowings";
+import * as internal from "../transforms/internal";
+
+import * as assert from "assert";
+
+// TODO(serialization): See if we can avoid this.
+function callAssertDeepEqual(a, b) {
+  return assert.deepEqual(a, b);
+}
+
+// TODO: (Naming)
+export class AssertDeepEqual extends beam.PTransform<
+  beam.PCollection<any>,
+  void
+> {
+  expected: any[];
+
+  constructor(expected: any[]) {
+    super("AssertDeepEqual");
+    this.expected = expected;
+  }
+
+  expand(pcoll: beam.PCollection<any>) {
+    const expected = this.expected;
+    pcoll.apply(
+      new Assert("Assert", (actual) => {
+        // Is there a less explicit way to do this?
+        const actualArray: any[] = [];
+        for (const a of actual) {
+          actualArray.push(a);
+        }
+        expected.sort();
+        actualArray.sort();
+        callAssertDeepEqual(actualArray, expected);
+      })
+    );
+  }
+}
+
+export class Assert extends beam.PTransform<beam.PCollection<any>, void> {
+  check: (actual: any[]) => void;
+
+  constructor(name: string, check: (actual: any[]) => void) {
+    super(name);
+    this.check = check;
+  }
+
+  expand(pcoll: beam.PCollection<any>) {
+    const check = this.check;
+    // We provide some value here to ensure there is at least one element
+    // so the DoFn gets invoked.
+    const singleton = pcoll
+      .root()
+      .apply(new beam.Impulse())
+      .map((_) => ({ tag: "expected" }));
+    // CoGBK.
+    const tagged = pcoll
+      .map((e) => ({ tag: "actual", value: e }))
+      .apply(new beam.WindowInto(new GlobalWindows()));
+    beam
+      .P([singleton, tagged])
+      .apply(new beam.Flatten())
+      .map((e) => ({ key: 0, value: e }))
+      .apply(new internal.GroupByKey()) // TODO: GroupBy.
+      .map(
+        beam.withName("extractActual", (kv) => {
+          // Javascript list comprehension?
+          const actual: any[] = [];
+          for (const o of kv.value) {
+            if (o.tag == "actual") {
+              actual.push(o.value);
+            }
+          }

Review Comment:
   ```suggestion
             const actual: any[] = kv.value?.filter(o => o.tag === "actual").map(o => o.value) || [];
   ```



##########
sdks/typescript/src/apache_beam/testing/assert.ts:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as beam from "../../apache_beam";
+import { GlobalWindows } from "../../apache_beam/transforms/windowings";
+import * as internal from "../transforms/internal";
+
+import * as assert from "assert";
+
+// TODO(serialization): See if we can avoid this.
+function callAssertDeepEqual(a, b) {
+  return assert.deepEqual(a, b);
+}
+
+// TODO: (Naming)
+export class AssertDeepEqual extends beam.PTransform<
+  beam.PCollection<any>,
+  void
+> {
+  expected: any[];
+
+  constructor(expected: any[]) {
+    super("AssertDeepEqual");
+    this.expected = expected;
+  }
+
+  expand(pcoll: beam.PCollection<any>) {
+    const expected = this.expected;
+    pcoll.apply(
+      new Assert("Assert", (actual) => {
+        // Is there a less explicit way to do this?
+        const actualArray: any[] = [];
+        for (const a of actual) {
+          actualArray.push(a);
+        }

Review Comment:
   ```suggestion
           const actualArray: any[] = [...a];
   ```
   
   > // Is there a less explicit way to do this?
   
   Yes :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17341: [BEAM-1754] Adds experimental Typescript Beam SDK

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858970264


##########
sdks/typescript/README.md:
##########
@@ -0,0 +1,208 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Typescript Beam SDK
+
+This is the start of a fully functioning Javascript (actually, Typescript) SDK.
+There are two distinct aims with this SDK
+
+1. Tap into the large (and relatively underserved, by existing data processing
+frameworks) community of javascript developers with a native SDK targeting this language.
+
+1. Develop a new SDK which can serve both as a proof of concept and reference
+that highlights the (relative) ease of porting Beam to new languages,
+a differentiating feature of Beam and Dataflow.
+
+To accomplish this, we lean heavily on the portability framework.
+For example, we make heavy use of cross-language transforms,
+in particular for IOs.
+In addition, the direct runner is simply an extension of the worker suitable
+for running on portable runners such as the ULR, which will directly transfer
+to running on production runners such as Dataflow and Flink.
+The target audience should hopefully not be put off by running other-language
+code encapsulated in docker images.
+
+## API
+
+We generally try to apply the concepts from the Beam API in a Typescript
+idiomatic way, but it should be noted that few of the initial developers
+have extensive (if any) Javascript/Typescript development experience, so
+feedback is greatly appreciated.
+
+In addition, some notable departures are taken from the traditional SDKs:
+
+* We take a "relational foundations" approach, where
+[schema'd data](https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit#heading=h.puuotbien1gf)
+is the primary way to interact with data, and we generally eschew the key-value
+requiring transforms in favor of a more flexible approach naming fields or
+expressions. Javascript's native Object is used as the row type.
+
+* As part of being schema-first we also de-emphasize Coders as a first-class
+concept in the SDK, relegating it to an advance feature used for interop.
+Though we can infer schemas from individual elements, it is still TBD to
+figure out if/how we can leverage the type system and/or function introspection
+to regularly infer schemas at construction time. A fallback coder using BSON
+encoding is used when we don't have sufficient type information.
+
+* We have added additional methods to the PCollection object, notably `map`
+and `flatmap`, [rather than only allowing apply](https://www.mail-archive.com/dev@beam.apache.org/msg06035.html).
+In addition, `apply` can accept a function argument `(PColletion) => ...` as
+well as a PTransform subclass, which treats this callable as if it were a
+PTransform's expand.
+
+* In the other direction, we have eliminated the
+[problematic Pipeline object](https://s.apache.org/no-beam-pipeline)
+from the API, instead providing a `Root` PValue on which pipelines are built,
+and invoking run() on a Runner.  We offer a less error-prone `Runner.run`
+which finishes only when the pipeline is completely finished as well as
+`Runner.runAsync` which returns a handle to the running pipeline.
+
+* Rather than introduce PCollectionTuple, PCollectionList, etc. we let PValue
+literally be an
+[array or object with PValue values](https://github.com/robertwb/beam-javascript/blob/de4390dd767f046903ac23fead5db333290462db/sdks/node-ts/src/apache_beam/pvalue.ts#L116)
+which transforms can consume or produce.
+These are applied by wrapping them with the `P` operator, e.g.
+`P([pc1, pc2, pc3]).apply(new Flatten())`.
+
+* Like Python, `flatMap` and `ParDo.process` return multiple elements by
+yielding them from a generator, rather than invoking a passed-in callback.
+TBD how to output to multiple distinct PCollections.
+There is currently an operation to split a PCollection into multiple
+PCollections based on the properties of the elements, and
+we may consider using a callback for side outputs.
+
+* The `map`, `flatmap`, and `ParDo.proceess` methods take an additional
+(optional) context argument, which is similar to the keyword arguments
+used in Python. These can be "ordinary" javascript objects (which are passed
+as is) or special DoFnParam objects which provide getters to element-specific
+information (such as the current timestamp, window, or side input) at runtime.
+
+* Javascript supports (and encourages) an asynchronous programing model, with
+many libraries requiring use of the async/await paradigm.
+As there is no way (by design) to go from the asyncronous style back to
+the synchronous style, this needs to be taken into account
+when designing the API.
+We currently offer asynchronous variants of `PValue.apply(...)` (in addition
+to the synchronous ones, as they are easier to chain) as well as making
+`Runner.run` asynchronous. TBD to do this for all user callbacks as well.
+
+An example pipeline can be found at https://github.com/robertwb/beam-javascript/blob/javascript/sdks/node-ts/src/apache_beam/examples/wordcount.ts
+
+## TODO
+
+This SDK is a work in progress. In January 2022 we developed the ability to
+construct and run basic pipelines (including external transforms and running
+on a portable runner) but the following big-ticket items remain.
+
+* Containerization
+
+  * Function and object serialization: we currently only support "loopback"
+  mode; to be able to run on a remote, distributed manner we need to finish up
+  the work in picking closures and DoFn objects. Some investigation has been
+  started here, but all existing libraries have non-trivial drawbacks.
+
+  * Finish the work in building a full SDK container image that starts
+  the worker.
+
+  * Actually use worker threads for multiple bundles.
+
+* API
+
+  * There are several TODOs of minor features or design decisions to finalize.
+
+    * Consider using (or supporting) 2-arrays rather than {key, value} objects
+      for KVs.
+
+    * Consider renaming map/flatMap to doMap/doFlatMap to avoid confusion with
+    Array.map that takes a key as a second callback argument.
+    Or force the second argument to be an Object, which would lead to a less
+    confusing API and clean up the implementation.
+    Also add a [do]Filter, and possibly a [do]Reduce?
+
+    * Move away from using classes.
+
+  * Add the ability to set good PTransform names, and ideally infer good
+  defaults.
+
+  * Advanced features like metrics, state, timers, and SDF.
+  Possibly some of these can wait.
+
+* Infrastructure
+
+  * Gradle and Jenkins integration for tests and style enforcement.
+
+* Other
+
+  * Enforce unique names for pipeline update.
+
+  * PipelineOptions should be a Javascript Object, not a proto Struct.
+
+  * Though Dataflow Runner v2 supports portability, submission is still done
+  via v1beta3 and interaction with GCS rather than the job submission API.
+
+  * Cleanup uses of var, this. Arrow functions. `===` vs `==`.
+
+  * Avoid `any` return types (and re-enable check in compiler).
+
+  * Relative vs. absoute imports, possibly via setting a base url with a
+  `jsconfig.json`.  Also remove imports from base.ts.
+
+  * More/better tests, including tests of illegal/unsupported use.
+
+  * Set channel options like `grpc.max_{send,receive}_message_length` as we
+  do in other SDKs.
+
+  * Reduce use of any.
+
+    * Could use `unknown` in its place where the type is truly unknown.
+
+    * It'd be nice to enforce, maybe re-enable `noImplicitAny: true` in
+    tsconfig if we can get the generated proto files to be ignored.
+
+  * Enable a linter like eslint and fix at least the low hanging fruit.
+
+There is probably more; there are many TODOs littered throughout the code.
+
+This code has also not yet been fully peer reviewed (it was the result of a
+hackathon) which needs to be done before putting it into the man repository.
+
+
+## Development.

Review Comment:
   ```suggestion
   ## Development.
   ```
   
   We should cut this before merging



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org