You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/07/09 16:24:06 UTC
[beam] branch master updated: Add typescript documentation to the programing guide. (#22137)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f641c24dd0a Add typescript documentation to the programing guide. (#22137)
f641c24dd0a is described below
commit f641c24dd0a81fce28d1ad516ed9a9b87566ffa8
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Sat Jul 9 09:23:57 2022 -0700
Add typescript documentation to the programing guide. (#22137)
This covers everything through section 4, Transforms.
Also implements missing transforms CoGroupByKey and Partition and
fixes a bug in DoubleCoder.
---
.../apache_beam/examples/snippets/snippets.py | 18 +-
.../apache_beam/examples/snippets/snippets_test.py | 4 +-
sdks/typescript/package.json | 2 +-
.../src/apache_beam/coders/standard_coders.ts | 6 +-
sdks/typescript/src/apache_beam/index.ts | 1 +
.../src/apache_beam/{ => runners}/index.ts | 4 +-
.../apache_beam/transforms/group_and_combine.ts | 38 +-
.../typescript/src/apache_beam/transforms/pardo.ts | 18 +
sdks/typescript/test/docs/programming_guide.ts | 476 +++++++++++++++++++++
sdks/typescript/test/js_coders_test.ts | 19 +-
.../content/en/documentation/programming-guide.md | 344 ++++++++++++++-
.../site/layouts/shortcodes/language-switcher.html | 5 +-
12 files changed, 898 insertions(+), 37 deletions(-)
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index f60330c65de..c3d94fb3a68 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -201,8 +201,8 @@ def pipeline_options_remote():
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
- parser.add_argument('--input-file')
- parser.add_argument('--output-path')
+ parser.add_argument('--input')
+ parser.add_argument('--output')
# [END pipeline_options_define_custom]
@@ -244,8 +244,8 @@ def pipeline_options_remote():
args = beam_options.view_as(MyOptions)
with TestPipeline() as pipeline: # Use TestPipeline for testing.
- lines = pipeline | beam.io.ReadFromText(args.input_file)
- lines | beam.io.WriteToText(args.output_path)
+ lines = pipeline | beam.io.ReadFromText(args.input)
+ lines | beam.io.WriteToText(args.output)
@mock.patch('apache_beam.Pipeline', TestPipeline)
@@ -259,13 +259,11 @@ def pipeline_options_local():
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
- '--input-file',
+ '--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
- '--output-path',
- required=True,
- help='The path prefix for output files.')
+ '--output', required=True, help='The path prefix for output files.')
# [END pipeline_options_define_custom_with_help_and_default]
@@ -286,8 +284,8 @@ def pipeline_options_local():
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
- | beam.io.ReadFromText(args.input_file)
- | beam.io.WriteToText(args.output_path))
+ | beam.io.ReadFromText(args.input)
+ | beam.io.WriteToText(args.output))
# [END pipeline_options_local]
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 3f19545e175..bbfa8435d0f 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -759,8 +759,8 @@ class SnippetsTest(unittest.TestCase):
result_path = temp_path + '.result'
test_argv = [
"unused_argv[0]",
- f"--input-file={temp_path}*",
- f"--output-path={result_path}",
+ f"--input={temp_path}*",
+ f"--output={result_path}",
]
with mock.patch.object(sys, 'argv', test_argv):
fn()
diff --git a/sdks/typescript/package.json b/sdks/typescript/package.json
index 45249e3f64a..5a201ae6321 100644
--- a/sdks/typescript/package.json
+++ b/sdks/typescript/package.json
@@ -25,7 +25,7 @@
"clean": "tsc --clean",
"worker": "node dist/src/apache_beam/worker/external_worker_service.js",
"pretest": "npm run build",
- "test": "mocha dist/test",
+ "test": "mocha dist/test dist/test/docs",
"codecovTest": "istanbul cover mocha dist/test --reporter lcovonly -- -R spec && codecov",
"prettier": "prettier --write src/",
"prettier-check": "prettier --check src/",
diff --git a/sdks/typescript/src/apache_beam/coders/standard_coders.ts b/sdks/typescript/src/apache_beam/coders/standard_coders.ts
index d15935711ce..a521d40ebda 100644
--- a/sdks/typescript/src/apache_beam/coders/standard_coders.ts
+++ b/sdks/typescript/src/apache_beam/coders/standard_coders.ts
@@ -123,9 +123,9 @@ export class DoubleCoder implements Coder<number> {
}
decode(reader: Reader, context: Context): number {
- const barr = new Uint8Array(reader.buf, reader.pos, 8);
- const dView = new DataView(barr.buffer);
- reader.float();
+ const barr = new Uint8Array(reader.buf);
+ const dView = new DataView(barr.buffer.slice(reader.pos, reader.pos + 8));
+ reader.double();
return dView.getFloat64(0, false);
}
diff --git a/sdks/typescript/src/apache_beam/index.ts b/sdks/typescript/src/apache_beam/index.ts
index f266c7d4544..fc83c43ba18 100644
--- a/sdks/typescript/src/apache_beam/index.ts
+++ b/sdks/typescript/src/apache_beam/index.ts
@@ -19,3 +19,4 @@
export * from "./pvalue";
export * from "./transforms";
export * from "./values";
+export * from "./runners";
diff --git a/sdks/typescript/src/apache_beam/index.ts b/sdks/typescript/src/apache_beam/runners/index.ts
similarity index 90%
copy from sdks/typescript/src/apache_beam/index.ts
copy to sdks/typescript/src/apache_beam/runners/index.ts
index f266c7d4544..68abb92913a 100644
--- a/sdks/typescript/src/apache_beam/index.ts
+++ b/sdks/typescript/src/apache_beam/runners/index.ts
@@ -16,6 +16,4 @@
* limitations under the License.
*/
-export * from "./pvalue";
-export * from "./transforms";
-export * from "./values";
+export { createRunner, Runner } from "./runner";
diff --git a/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts b/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts
index 8ca60cab570..739a3a61f9a 100644
--- a/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts
+++ b/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts
@@ -18,8 +18,9 @@
import { KV } from "../values";
import { PTransform, PTransformClass, withName } from "./transform";
+import { flatten } from "./flatten";
import { PCollection } from "../pvalue";
-import { PValue } from "../pvalue";
+import { PValue, P } from "../pvalue";
import * as internal from "./internal";
import { count } from "./combiners";
@@ -66,6 +67,7 @@ export class GroupBy<T, K> extends PTransformClass<
) {
super();
[this.keyFn, this.keyNames] = extractFnAndName(key, keyName || "key");
+ // XXX: Actually use this.
this.keyName = typeof this.keyNames === "string" ? this.keyNames : "key";
}
@@ -297,6 +299,40 @@ function multiCombineFn(
};
}
+// TODO: Consider adding valueFn(s) rather than using the full value.
+export function coGroupBy<T, K>(
+ key: string | string[] | ((element: T) => K),
+ keyName: string | undefined = undefined
+): PTransform<
+ { [key: string]: PCollection<any> },
+ PCollection<{ key: K; values: { [key: string]: Iterable<any> } }>
+> {
+ return function coGroupBy(inputs: { [key: string]: PCollection<any> }) {
+ const [keyFn, keyNames] = extractFnAndName(key, keyName || "key");
+ keyName = typeof keyNames === "string" ? keyNames : "key";
+ const tags = [...Object.keys(inputs)];
+ const tagged = [...Object.entries(inputs)].map(([tag, pcoll]) =>
+ pcoll.map((element) => ({
+ key: keyFn(element),
+ tag,
+ element,
+ }))
+ );
+ return P(tagged)
+ .apply(flatten())
+ .apply(groupBy("key"))
+ .map(function groupValues({ key, value }) {
+ const groupedValues: { [key: string]: any[] } = Object.fromEntries(
+ tags.map((tag) => [tag, []])
+ );
+ for (const { tag, element } of value) {
+ groupedValues[tag].push(element);
+ }
+ return { key, values: groupedValues };
+ });
+ };
+}
+
// TODO: (Typescript) Can I type T as "something that has this key" and/or,
// even better, ensure it has the correct type?
// Should be possible to get rid of the cast somehow.
diff --git a/sdks/typescript/src/apache_beam/transforms/pardo.ts b/sdks/typescript/src/apache_beam/transforms/pardo.ts
index 74f8383c089..02ad336dbc7 100644
--- a/sdks/typescript/src/apache_beam/transforms/pardo.ts
+++ b/sdks/typescript/src/apache_beam/transforms/pardo.ts
@@ -208,6 +208,24 @@ export function split<T extends { [key: string]: unknown }>(
return withName(`Split(${tags})`, expandInternal);
}
+export function partition<T>(
+ partitionFn: (element: T, numPartitions: number) => number,
+ numPartitions: number
+): PTransform<PCollection<T>, PCollection<T>[]> {
+ return function partition(input: PCollection<T>) {
+ const indices = Array.from({ length: numPartitions }, (v, i) =>
+ i.toString()
+ );
+ const splits = input
+ .map((x) => {
+ const part = partitionFn(x, numPartitions);
+ return { ["" + part]: x };
+ })
+ .apply(split(indices));
+ return indices.map((ix) => splits[ix]);
+ };
+}
+
/*
* This is the root class of special parameters that can be provided in the
* context of a map or DoFn.process method. At runtime, one can invoke the
diff --git a/sdks/typescript/test/docs/programming_guide.ts b/sdks/typescript/test/docs/programming_guide.ts
new file mode 100644
index 00000000000..c6cb3131f48
--- /dev/null
+++ b/sdks/typescript/test/docs/programming_guide.ts
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as beam from "../../src/apache_beam";
+import { PCollection } from "../../src/apache_beam";
+import * as combiners from "../../src/apache_beam/transforms/combiners";
+import * as pardo from "../../src/apache_beam/transforms/pardo";
+import { assertDeepEqual } from "../../src/apache_beam/testing/assert";
+
+describe("Programming Guide Tested Samples", function () {
+ describe("Pipelines", function () {
+ it("pipelines_constructing_creating", async function () {
+ // [START pipelines_constructing_creating]
+ await beam.createRunner().run(function pipeline(root) {
+ // Use root to build a pipeline.
+ });
+ // [END pipelines_constructing_creating]
+ });
+
+ it("pipeline_options", async function () {
+ const yargs = { argv: {} };
+ // [START pipeline_options]
+ const pipeline_options = {
+ runner: "default",
+ project: "my_project",
+ };
+
+ const runner = beam.createRunner(pipeline_options);
+
+ const runnerFromCommandLineOptions = beam.createRunner(yargs.argv);
+ // [END pipeline_options]
+ });
+
+ it("pipeline_options_define_custom", async function () {
+ const yargs = { argv: {} };
+ // [START pipeline_options_define_custom]
+ const options = yargs.argv; // Or an alternative command-line parsing library.
+
+ // Use options.input and options.output during pipeline construction.
+ // [END pipeline_options_define_custom]
+ });
+
+ it("pipelines_constructing_reading", async function () {
+ const textio = require("../../src/apache_beam/io/textio");
+ // [START pipelines_constructing_reading]
+ async function pipeline(root: beam.Root) {
+ // Note that textio.ReadFromText is an AsyncPTransform.
+ const pcoll: PCollection<string> = await root.asyncApply(
+ textio.ReadFromText("path/to/text_pattern")
+ );
+ }
+ // [END pipelines_constructing_reading]
+ });
+ });
+
+ describe("PCollections", function () {
+ it("create_pcollection", async function () {
+ // [START create_pcollection]
+ function pipeline(root: beam.Root) {
+ const pcoll = root.apply(
+ beam.create([
+ "To be, or not to be: that is the question: ",
+ "Whether 'tis nobler in the mind to suffer ",
+ "The slings and arrows of outrageous fortune, ",
+ "Or to take arms against a sea of troubles, ",
+ ])
+ );
+ }
+ // [END create_pcollection]
+ await beam.createRunner().run(pipeline);
+ });
+ });
+
+ describe("Transforms", function () {
+ it("model_pardo", async function () {
+ // [START model_pardo_pardo]
+ function computeWordLengthFn(): beam.DoFn<string, number> {
+ return {
+ process: function* (element) {
+ yield element.length;
+ },
+ };
+ }
+ // [END model_pardo_pardo]
+ await beam.createRunner().run((root: beam.Root) => {
+ const words = root.apply(beam.create(["a", "bb", "cccc"]));
+ // [START model_pardo_apply]
+ const result = words.apply(beam.parDo(computeWordLengthFn()));
+ // [END model_pardo_apply]
+ result.apply(assertDeepEqual([1, 2, 4]));
+ });
+ });
+
+ it("model_pardo_using_flatmap", async function () {
+ await beam.createRunner().run((root: beam.Root) => {
+ const words = root.apply(beam.create(["a", "bb", "cccc"]));
+ // [START model_pardo_using_flatmap]
+ const result = words.flatMap((word) => [word.length]);
+ // [END model_pardo_using_flatmap]
+ result.apply(assertDeepEqual([1, 2, 4]));
+ });
+ });
+
+ it("model_pardo_using_map", async function () {
+ await beam.createRunner().run((root: beam.Root) => {
+ const words = root.apply(beam.create(["a", "bb", "cccc"]));
+ // [START model_pardo_using_map]
+ const result = words.map((word) => word.length);
+ // [END model_pardo_using_map]
+ result.apply(assertDeepEqual([1, 2, 4]));
+ });
+ });
+
+ it("groupby", async function () {
+ await beam.createRunner().run((root: beam.Root) => {
+ const cats = [
+ { word: "cat", score: 1 },
+ { word: "cat", score: 5 },
+ ];
+ const dogs = [{ word: "dog", score: 5 }];
+ function sortValues({ key, value }) {
+ return { key, value: value.sort() };
+ }
+
+ const scores = root.apply(beam.create(cats.concat(dogs)));
+ // [START groupby]
+ // This will produce a PCollection with elements like
+ // {key: "cat", value: [{ word: "cat", score: 1 },
+ // { word: "cat", score: 5 }, ...]}
+ // {key: "dog", value: [{ word: "dog", score: 5 }, ...]}
+ const grouped_by_word = scores.apply(beam.groupBy("word"));
+
+ // This will produce a PCollection with elements like
+ // {key: 3, value: [{ word: "cat", score: 1 },
+ // { word: "dog", score: 5 },
+ // { word: "cat", score: 5 }, ...]}
+ const by_word_length = scores.apply(beam.groupBy((x) => x.word.length));
+ // [END groupby]
+
+ // XXX Should it be {key, values} rather than {key, value}?
+ grouped_by_word //
+ .map(sortValues)
+ .apply(
+ assertDeepEqual([
+ { key: "cat", value: cats },
+ { key: "dog", value: dogs },
+ ])
+ );
+
+ by_word_length
+ .map(sortValues)
+ .apply(
+ assertDeepEqual([{ key: 3, value: cats.concat(dogs).sort() }])
+ );
+ });
+ });
+
+ it("cogroupbykey", async function () {
+ await beam.createRunner().run((root: beam.Root) => {
+ // [START cogroupbykey_inputs]
+ const emails_list = [
+ { name: "amy", email: "amy@example.com" },
+ { name: "carl", email: "carl@example.com" },
+ { name: "julia", email: "julia@example.com" },
+ { name: "carl", email: "carl@email.com" },
+ ];
+ const phones_list = [
+ { name: "amy", phone: "111-222-3333" },
+ { name: "james", phone: "222-333-4444" },
+ { name: "amy", phone: "333-444-5555" },
+ { name: "carl", phone: "444-555-6666" },
+ ];
+
+ const emails = root.apply(beam.create(emails_list));
+ const phones = root.apply(beam.create(phones_list));
+ // [END cogroupbykey_inputs]
+
+ // [START cogroupbykey_raw_outputs]
+ const results = [
+ {
+ name: "amy",
+ values: {
+ emails: [{ name: "amy", email: "amy@example.com" }],
+ phones: [
+ { name: "amy", phone: "111-222-3333" },
+ { name: "amy", phone: "333-444-5555" },
+ ],
+ },
+ },
+ {
+ name: "carl",
+ values: {
+ emails: [
+ { name: "carl", email: "carl@example.com" },
+ { name: "carl", email: "carl@email.com" },
+ ],
+ phones: [{ name: "carl", phone: "444-555-6666" }],
+ },
+ },
+ {
+ name: "james",
+ values: {
+ emails: [],
+ phones: [{ name: "james", phone: "222-333-4444" }],
+ },
+ },
+ {
+ name: "julia",
+ values: {
+ emails: [{ name: "julia", email: "julia@example.com" }],
+ phones: [],
+ },
+ },
+ ];
+ // [END cogroupbykey_raw_outputs]
+
+ // [START cogroupbykey]
+ const formatted_results_pcoll = beam
+ .P({ emails, phones })
+ .apply(beam.coGroupBy("name"))
+ .map(function formatResults({ key, values }) {
+ const emails = values.emails.map((x) => x.email).sort();
+ const phones = values.phones.map((x) => x.phone).sort();
+ return `${key}; [${emails}]; [${phones}]`;
+ });
+ // [END cogroupbykey]
+
+ // [START cogroupbykey_formatted_outputs]
+ const formatted_results = [
+ "amy; [amy@example.com]; [111-222-3333,333-444-5555]",
+ "carl; [carl@email.com,carl@example.com]; [444-555-6666]",
+ "james; []; [222-333-4444]",
+ "julia; [julia@example.com]; []",
+ ];
+ // [END cogroupbykey_formatted_outputs]
+
+ formatted_results_pcoll.apply(assertDeepEqual(formatted_results));
+ });
+ });
+
+ it("combine_simple_sum", async function () {
+ // prettier-ignore
+ await beam.createRunner().run((root: beam.Root) => {
+ // [START combine_simple_sum]
+ const pcoll = root.apply(beam.create([1, 10, 100, 1000]));
+ const result = pcoll.apply(
+ beam
+ .groupGlobally()
+ .combining((c) => c, (x, y) => x + y, "sum")
+ .combining((c) => c, (x, y) => x * y, "product")
+ );
+ const expected = { sum: 1111, product: 1000000 }
+ // [END combine_simple_sum]
+ result.apply(assertDeepEqual([expected]));
+ });
+ });
+
+ it("combine_custom_average", async function () {
+ await beam.createRunner().run((root: beam.Root) => {
+ // [START combine_custom_average]
+ const meanCombineFn: beam.CombineFn<number, [number, number], number> =
+ {
+ createAccumulator: () => [0, 0],
+ addInput: ([sum, count]: [number, number], i: number) => [
+ sum + i,
+ count + 1,
+ ],
+ mergeAccumulators: (accumulators: [number, number][]) =>
+ accumulators.reduce(([sum0, count0], [sum1, count1]) => [
+ sum0 + sum1,
+ count0 + count1,
+ ]),
+ extractOutput: ([sum, count]: [number, number]) => sum / count,
+ };
+ // [END combine_custom_average]
+ // [START combine_global_average]
+ const pcoll = root.apply(beam.create([4, 5, 6]));
+ const result = pcoll.apply(
+ beam.groupGlobally().combining((c) => c, meanCombineFn, "mean")
+ );
+ // [END combine_global_average]
+ result.apply(assertDeepEqual([{ mean: 5 }]));
+ });
+ });
+
+ it("combine_per_key", async function () {
+ await beam.createRunner().run((root: beam.Root) => {
+ // [START combine_per_key]
+ const pcoll = root.apply(
+ beam.create([
+ { player: "alice", accuracy: 1.0 },
+ { player: "bob", accuracy: 0.99 },
+ { player: "eve", accuracy: 0.5 },
+ { player: "eve", accuracy: 0.25 },
+ ])
+ );
+ const result = pcoll.apply(
+ beam
+ .groupBy("player")
+ .combining("accuracy", combiners.mean, "mean")
+ .combining("accuracy", combiners.max, "max")
+ );
+ const expected = [
+ { player: "alice", mean: 1.0, max: 1.0 },
+ { player: "bob", mean: 0.99, max: 0.99 },
+ { player: "eve", mean: 0.375, max: 0.5 },
+ ];
+ // [END combine_per_key]
+ result.apply(assertDeepEqual(expected));
+ });
+ });
+
+ it("model_multiple_pcollections_flatten", async function () {
+ await beam.createRunner().run((root: beam.Root) => {
+ // [START model_multiple_pcollections_flatten]
+ const fib = root.apply(beam.create([1, 1, 2, 3, 5, 8]));
+ const pow = root.apply(beam.create([1, 2, 4, 8, 16, 32]));
+ const result = beam.P([fib, pow]).apply(beam.flatten());
+ // [END model_multiple_pcollections_flatten]
+ result.apply(assertDeepEqual([1, 1, 1, 2, 2, 3, 4, 5, 8, 8, 16, 32]));
+ });
+ });
+
+ it("model_multiple_pcollections_partition", async function () {
+ await beam.createRunner().run((root: beam.Root) => {
+ type Student = number;
+ function getPercentile(s: Student) {
+ return s;
+ }
+ const students = root.apply(beam.create([1, 5, 50, 98, 99]));
+ // [START model_multiple_pcollections_partition]
+ const deciles: PCollection<Student>[] = students.apply(
+ beam.partition(
+ (student, numPartitions) =>
+ Math.floor((getPercentile(student) / 100) * numPartitions),
+ 10
+ )
+ );
+ const topDecile: PCollection<Student> = deciles[9];
+ // [END model_multiple_pcollections_partition]
+ topDecile.apply(assertDeepEqual([98, 99]));
+ });
+ });
+
+ it("model_pardo_side_input", async function () {
+ await beam.createRunner().run((root: beam.Root) => {
+ const words = root.apply(beam.create(["a", "bb", "cccc"]));
+ // [START model_pardo_side_input]
+ // meanLengthPColl will contain a single number whose value is the
+ // average length of the words
+ const meanLengthPColl: PCollection<number> = words
+ .apply(
+ beam
+ .groupGlobally<string>()
+ .combining((word) => word.length, combiners.mean, "mean")
+ )
+ .map(({ mean }) => mean);
+
+ // Now we use this as a side input to yield only words that are
+ // smaller than average.
+ const smallWords = words.flatMap(
+ // This is the function, taking context as a second argument.
+ function* keepSmall(word, context) {
+ if (word.length < context.meanLength.lookup()) {
+ yield word;
+ }
+ },
+ // This is the context that will be passed as a second argument.
+ { meanLength: pardo.singletonSideInput(meanLengthPColl) }
+ );
+ // [END model_pardo_side_input]
+ smallWords.apply(assertDeepEqual(["a", "bb"]));
+ });
+ });
+
+ it("model_multiple_output", async function () {
+ await beam.createRunner().run((root: beam.Root) => {
+ const words = root.apply(beam.create(["a", "bb", "cccccc", "xx"]));
+ function isMarkedWord(word) {
+ return word.includes("x");
+ }
+
+ // [START model_multiple_output_dofn]
+ const to_split = words.flatMap(function* (word) {
+ if (word.length < 5) {
+ yield { below: word };
+ } else {
+ yield { above: word };
+ }
+ if (isMarkedWord(word)) {
+ yield { marked: word };
+ }
+ });
+ // [END model_multiple_output_dofn]
+
+ // [START model_multiple_output]
+ const { below, above, marked } = to_split.apply(
+ beam.split(["below", "above", "marked"])
+ );
+ // [END model_multiple_output]
+
+ below.apply(assertDeepEqual(["a", "bb", "xx"]));
+ above.apply(assertDeepEqual(["cccccc"]));
+ marked.apply(assertDeepEqual(["xx"]));
+ });
+ });
+
+ it("other_params", async function () {
+ await beam.createRunner().run((root: beam.Root) => {
+ const pcoll = root.apply(beam.create(["a", "b", "c"]));
+ {
+ // [START timestamp_param]
+ function processFn(element, context) {
+ return context.timestamp.lookup();
+ }
+
+ pcoll.map(processFn, { timestamp: pardo.timestampParam() });
+ // [END timestamp_param]
+ }
+
+ {
+ function processFn(element, context) {}
+
+ // [START window_param]
+ pcoll.map(processFn, { timestamp: pardo.windowParam() });
+ // [END window_param]
+
+ // [START pane_info_param]
+ pcoll.map(processFn, { timestamp: pardo.paneInfoParam() });
+ // [END pane_info_param]
+ }
+ });
+ });
+
+ it("countwords_composite", async function () {
+ await beam.createRunner().run((root: beam.Root) => {
+ const lines = root.apply(beam.create(["a b", "b c c c"]));
+ // [START countwords_composite]
+ function countWords(lines: PCollection<string>) {
+ return lines //
+ .map((s: string) => s.toLowerCase())
+ .flatMap(function* splitWords(line: string) {
+ yield* line.split(/[^a-z]+/);
+ })
+ .apply(beam.countPerElement());
+ }
+
+ const counted = lines.apply(countWords);
+ // [END countwords_composite]
+ counted.apply(
+ assertDeepEqual([
+ { element: "a", count: 1 },
+ { element: "b", count: 2 },
+ { element: "c", count: 3 },
+ ])
+ );
+ });
+ });
+ //
+ });
+});
diff --git a/sdks/typescript/test/js_coders_test.ts b/sdks/typescript/test/js_coders_test.ts
index 18e19f893c8..308bd48713e 100644
--- a/sdks/typescript/test/js_coders_test.ts
+++ b/sdks/typescript/test/js_coders_test.ts
@@ -108,7 +108,7 @@ describe("JavaScript native coders", function () {
inputObject
);
});
- it("encodes and decodes a number properly", function () {
+ it("encodes and decodes an integer properly", function () {
const inputObject = 12345678;
const writer = new BufferWriter();
@@ -125,6 +125,23 @@ describe("JavaScript native coders", function () {
inputObject
);
});
+ it("encodes and decodes a float properly", function () {
+ const inputObject = 0.12345678;
+ const writer = new BufferWriter();
+
+ const encoded = objCoder.encode(
+ inputObject,
+ writer,
+ Context.needsDelimiters
+ );
+
+ const buffer = writer.finish();
+ const reader = new BufferReader(buffer);
+ assert.deepEqual(
+ objCoder.decode(reader, Context.needsDelimiters),
+ inputObject
+ );
+ });
it("encodes and decodes a BigInt properly", function () {
// TODO(pabloem): THIS TEST NEEDS TO BE sKIPPED BECAUSE VERY LARGE INTS ARE BROKEN
this.skip();
diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index cb9d47f9e8c..eeae46e0639 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -32,7 +32,7 @@ If you want a brief introduction to Beam's basic concepts before reading the
programming guide, take a look at the
[Basics of the Beam model](/documentation/basics/) page.
-{{< language-switcher java py go >}}
+{{< language-switcher java py go typescript >}}
{{< paragraph class="language-py" >}}
The Python SDK supports Python 3.7, 3.8, and 3.9.
@@ -42,6 +42,10 @@ The Python SDK supports Python 3.7, 3.8, and 3.9.
The Go SDK supports Go v1.18+. SDK release 2.32.0 is the last experimental version.
{{< /paragraph >}}
+{{< paragraph class="language-typescript" >}}
+The Typescript SDK supports Node v16+ and is still experimental.
+{{< /paragraph >}}
+
## 1. Overview {#overview}
To use Beam, you need to first create a driver program using the classes in one
@@ -131,6 +135,11 @@ your pipeline's configuration options programmatically, but it's often easier to
set the options ahead of time (or read them from the command line) and pass them
to the `Pipeline` object when you create the object.
+<span class="language-typescript">
+A Pipeline in the Typescript API is simply a function that will be called
+with a single `root` object and is passed to a Runner's `run` method.
+</span>
+
{{< highlight java >}}
// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();
@@ -147,6 +156,10 @@ Pipeline p = Pipeline.create(options);
{{< code_sample "sdks/go/examples/snippets/01_03intro.go" pipelines_constructing_creating >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" pipelines_constructing_creating >}}
+{{< /highlight >}}
+
### 2.1. Configuring pipeline options {#configuring-pipeline-options}
Use the pipeline options to configure different aspects of your pipeline, such
@@ -179,6 +192,12 @@ Use Go flags to parse command line arguments to configure your pipeline. Flags m
before `beam.Init()` is called.
{{< /paragraph >}}
+{{< paragraph class="language-typescript" >}}
+Any Javascript object can be used as pipeline options.
+One can either construct one manually, but it is also common to pass an object
+created from command line options such as `yargs.argv`.
+{{< /paragraph >}}
+
{{< highlight java >}}
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();
@@ -192,6 +211,10 @@ PipelineOptions options =
{{< code_sample "sdks/go/examples/snippets/01_03intro.go" pipeline_options >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" pipeline_options >}}
+{{< /highlight >}}
+
This interprets command-line arguments that follow the format:
```
@@ -221,9 +244,12 @@ Defining flag variables this way lets you specify any of the options as a comman
#### 2.1.2. Creating custom options {#creating-custom-options}
You can add your own custom options in addition to the standard
-`PipelineOptions`. To add your own options, define an interface with getter and
-setter methods for each option, as in the following example for
-adding `input` and `output` custom options:
+`PipelineOptions`.
+{{< paragraph class="language-java" >}}
+To add your own options, define an interface with getter and
+setter methods for each option.
+{{< /paragraph >}}
+The following example shows how to add `input` and `output` custom options:
{{< highlight java >}}
public interface MyOptions extends PipelineOptions {
@@ -243,10 +269,16 @@ public interface MyOptions extends PipelineOptions {
{{< code_sample "sdks/go/examples/snippets/01_03intro.go" pipeline_options_define_custom >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" pipeline_options_define_custom >}}
+{{< /highlight >}}
+
You can also specify a description, which appears when a user passes `--help` as
a command-line argument, and a default value.
+{{< paragraph class="language-java language-py langauge-go" >}}
You set the description and default value using annotations, as follows:
+{{< /paragraph >}}
{{< highlight java >}}
public interface MyOptions extends PipelineOptions {
@@ -270,6 +302,12 @@ public interface MyOptions extends PipelineOptions {
{{< code_sample "sdks/go/examples/snippets/01_03intro.go" pipeline_options_define_custom_with_help_and_default >}}
{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+For Python, you can also simply parse your custom options with argparse; there
+is no need to create a separate PipelineOptions subclass.
+{{< /paragraph >}}
+
{{< paragraph class="language-java" >}}
It's recommended that you register your interface with `PipelineOptionsFactory`
and then pass the interface when creating the `PipelineOptions` object. When you
@@ -329,12 +367,16 @@ Each data source adapter has a `Read` transform; to read, you must apply that
transform to the `Pipeline` object itself.
<span class="language-java">`TextIO.Read`</span>
<span class="language-py">`io.TextFileSource`</span>
-<span class="language-go">`textio.Read`</span>, for example, reads from an
+<span class="language-go">`textio.Read`</span>
+<span class="language-typescript">`textio.ReadFromText`</span>,
+for example, reads from an
external text file and returns a `PCollection` whose elements are of type
`String`, each `String` represents one line from the text file. Here's how you
would apply <span class="language-java">`TextIO.Read`</span>
<span class="language-py">`io.TextFileSource`</span>
-<span class="language-go">`textio.Read`</span> to your `Pipeline` to create
+<span class="language-go">`textio.Read`</span>
+<span class="language-typescript">`textio.ReadFromText`</span>
+to your `Pipeline` <span class="language-typescript">root</span> to create
a `PCollection`:
{{< highlight java >}}
@@ -358,6 +400,10 @@ public static void main(String[] args) {
{{< code_sample "sdks/go/examples/snippets/01_03intro.go" pipelines_constructing_reading >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" pipelines_constructing_reading >}}
+{{< /highlight >}}
+
See the [section on I/O](#pipeline-io) to learn more about how to read from the
various data sources supported by the Beam SDK.
@@ -386,8 +432,16 @@ To create a `PCollection` from an in-memory `slice`, you use the Beam-provided
`beam.CreateList` transform. Pass the pipeline `scope`, and the `slice` to this transform.
{{< /paragraph >}}
+{{< paragraph class="language-typescript" >}}
+To create a `PCollection` from an in-memory `array`, you use the Beam-provided
+`Create` transform. Apply this transform directly to your `Root` object.
+{{< /paragraph >}}
+
The following example code shows how to create a `PCollection` from an in-memory
-<span class="language-java">`List`</span><span class="language-py">`list`</span><span class="language-go">`slice`</span>:
+<span class="language-java">`List`</span>
+<span class="language-py">`list`</span>
+<span class="language-go">`slice`</span>
+<span class="language-typescript">`array`</span>:
{{< highlight java >}}
public static void main(String[] args) {
@@ -416,6 +470,10 @@ public static void main(String[] args) {
{{< code_sample "sdks/go/examples/snippets/01_03intro.go" model_pcollection >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" create_pcollection >}}
+{{< /highlight >}}
+
### 3.2. PCollection characteristics {#pcollection-characteristics}
A `PCollection` is owned by the specific `Pipeline` object for which it is
@@ -557,7 +615,12 @@ the transform itself as an argument, and the operation returns the output
[Output PCollection] := beam.ParDo(scope, [Transform], [Input PCollection])
{{< /highlight >}}
-{{< paragraph class="language-java language-py" >}}
+{{< highlight typescript >}}
+[Output PCollection] = [Input PCollection].apply([Transform])
+[Output PCollection] = await [Input PCollection].asyncApply([AsyncTransform])
+{{< /highlight >}}
+
+{{< paragraph class="language-java language-py language-typescript" >}}
Because Beam uses a generic `apply` method for `PCollection`, you can both chain
transforms sequentially and also apply transforms that contain other transforms
nested within (called [composite transforms](#composite-transforms) in the Beam
@@ -600,6 +663,12 @@ For example, you can successively call transforms on PCollections to modify the
[Final Output PCollection] := beam.ParDo(scope, [Third Transform], [Third PCollection])
{{< /highlight >}}
+{{< highlight typescript >}}
+[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
+.apply([Second Transform])
+.apply([Third Transform])
+{{< /highlight >}}
+
The graph of this pipeline looks like the following:
![This linear pipeline starts with one input collection, sequentially applies
@@ -630,6 +699,12 @@ a branching pipeline, like so:
[PCollection of 'B' names] = beam.ParDo(scope, [Transform B], [PCollection of database table rows])
{{< /highlight >}}
+{{< highlight typescript >}}
+[PCollection of database table rows] = [Database Table Reader].apply([Read Transform])
+[PCollection of 'A' names] = [PCollection of database table rows].apply([Transform A])
+[PCollection of 'B' names] = [PCollection of database table rows].apply([Transform B])
+{{< /highlight >}}
+
The graph of this branching pipeline looks like the following:
![This pipeline applies two transforms to a single input collection. Each
@@ -643,6 +718,27 @@ nest multiple transforms inside a single, larger transform. Composite transforms
are particularly useful for building a reusable sequence of simple steps that
get used in a lot of different places.
+{{< paragraph class="language-python" >}}
+The pipe syntax allows one to apply PTransforms to `tuple`s and `dict`s of
+PCollections as well for those transforms accepting multiple inputs (such as
+`Flatten` and `CoGroupByKey`).
+{{< /paragraph >}}
+
+{{< paragraph class="language-typescript" >}}
+PTransforms can also be applied to any `PValue`, which include the Root object,
+PCollections, arrays of `PValue`s, and objects with `PValue` values.
+One can apply transforms to these composite types by wrapping them with
+`beam.P`, e.g.
+`beam.P({left: pcollA, right: pcollB}).apply(transformExpectingTwoPCollections)`.
+{{< /paragraph >}}
+
+{{< paragraph class="language-typescript" >}}
+PTransforms come in two flavors, synchronous and asynchronous, depending on
+whether their *application** involves asynchronous invocations.
+An `AsyncTransform` must be applied with `asyncApply` and returns a `Promise`
+which must be awaited before further pipeline construction.
+{{< /paragraph >}}
+
### 4.2. Core Beam transforms {#core-beam-transforms}
Beam provides the following core transforms, each of which represents a different
@@ -655,6 +751,11 @@ processing paradigm:
* `Flatten`
* `Partition`
+{{< paragraph class="language-typescript" >}}
+The Typescript SDK provides some of the most basic of these transforms
+as methods on `PCollection` itself.
+{{< /paragraph >}}
+
#### 4.2.1. ParDo {#pardo}
`ParDo` is a Beam transform for generic parallel processing. The `ParDo`
@@ -766,6 +867,15 @@ var words beam.PCollection = ...
{{< code_sample "sdks/go/examples/snippets/04transforms.go" model_pardo_apply >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+# The input PCollection of Strings.
+const words : PCollection<string> = ...
+
+# The DoFn to perform on each element in the input PCollection.
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" model_pardo_pardo >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" model_pardo_apply >}}
+{{< /highlight >}}
+
In the example, our input `PCollection` contains <span class="language-java language-py">`String`</span>
<span class="language-go">`string`</span> values. We apply a
`ParDo` transform that specifies a function (`ComputeWordLengthFn`) to compute
@@ -839,7 +949,7 @@ and output types of your `DoFn` or the framework will raise an error. Note: `@El
`ProcessContext` parameter should be used instead.
{{< /paragraph >}}
-{{< paragraph class="language-py" >}}
+{{< paragraph class="language-py language-typescript" >}}
Inside your `DoFn` subclass, you'll write a method `process` where you provide
the actual processing logic. You don't need to manually extract the elements
from the input collection; the Beam SDKs handle that for you. Your `process` method
@@ -877,6 +987,10 @@ static class ComputeWordLengthFn extends DoFn<String, Integer> {
{{< code_sample "sdks/go/examples/snippets/04transforms.go" model_pardo_pardo >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" model_pardo_pardo >}}
+{{< /highlight >}}
+
{{< paragraph class="language-go" >}}
Simple DoFns can also be written as functions.
{{< /paragraph >}}
@@ -939,7 +1053,7 @@ following requirements:
</span>
-<span class="language-py">
+<span class="language-py language-typescript">
* You should not in any way modify the `element` argument provided to the
`process` method, or any side inputs.
@@ -964,13 +1078,15 @@ If your function is relatively straightforward, you can simplify your use of
`ParDo` by providing a lightweight `DoFn` in-line, as
<span class="language-java">an anonymous inner class instance</span>
<span class="language-py">a lambda function</span>
-<span class="language-go">an anonymous function</span>.
+<span class="language-go">an anonymous function</span>
+<span class="language-typescript">a function passed to `PCollection.map` or `PCollection.flatMap`</span>.
Here's the previous example, `ParDo` with `ComputeLengthWordsFn`, with the
`DoFn` specified as
<span class="language-java">an anonymous inner class instance</span>
<span class="language-py">a lambda function</span>
-<span class="language-go">an anonymous function</span>:
+<span class="language-go">an anonymous function</span>
+<span class="language-typescript">a function</span>:
{{< highlight java >}}
// The input PCollection.
@@ -1006,16 +1122,22 @@ lengths := beam.ParDo(s, func (word string, emit func(int)) {
}, words)
{{< /highlight >}}
+{{< highlight typescript >}}
+// The input PCollection of strings.
+words = ...
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" model_pardo_using_flatmap >}}
+{{< /highlight >}}
+
If your `ParDo` performs a one-to-one mapping of input elements to output
elements--that is, for each input element, it applies a function that produces
*exactly one* output element, <span class="language-go">you can return that
element directly.</span><span class="language-java language-py">you can use the higher-level
-<span class="language-java">`MapElements`</span><span class="language-py">`Map`</span>
+<span class="language-java">`MapElements`</span><span class="language-py language-py">`Map`</span>
transform.</span><span class="language-java">`MapElements` can accept an anonymous
Java 8 lambda function for additional brevity.</span>
Here's the previous example using <span class="language-java">`MapElements`</span>
-<span class="language-py">`Map`</span><span class="language-go">a direct return</span>:
+<span class="language-py language-typescript">`Map`</span><span class="language-go">a direct return</span>:
{{< highlight java >}}
// The input PCollection.
@@ -1044,6 +1166,12 @@ var words beam.PCollection = ...
{{< code_sample "sdks/go/examples/snippets/04transforms.go" model_pardo_apply_anon >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+// The input PCollection of string.
+words = ...
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" model_pardo_using_map >}}
+{{< /highlight >}}
+
<span class="language-java" >
> **Note:** You can use Java 8 lambda functions with several other Beam
@@ -1129,6 +1257,14 @@ individual values) to a uni-map (unique keys to collections of values).
<span class="language-java">Using `GroupByKey` is straightforward:</span>
+{{< paragraph class="language-py language-typescript" >}}
+While all SDKs have a `GroupByKey` transform, using `GroupBy` is
+generally more natural.
+The `GroupBy` transform can be parameterized by the name(s) of properties
+on which to group the elements of the PCollection, or a function taking
+the each element as input that maps to a key on which to do grouping.
+{{< /paragraph >}}
+
{{< highlight java >}}
// The input PCollection.
PCollection<KV<String, String>> mapped = ...;
@@ -1139,10 +1275,27 @@ PCollection<KV<String, Iterable<String>>> reduced =
mapped.apply(GroupByKey.<String, String>create());
{{< /highlight >}}
+{{< highlight py >}}
+# The input PCollection of (`string`, `int`) tuples.
+words_and_counts = ...
+
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_group_by_key_transform >}}
+
+
+{{< /highlight >}}
+
{{< highlight go >}}
{{< code_sample "sdks/go/examples/snippets/04transforms.go" groupbykey >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+// A PCollection of elements like
+// {word: "cat", score: 1}, {word: "dog", score: 5}, {word: "cat", score: 5}, ...
+const scores : PCollection<{word: string, score: number}> = ...
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" groupby >}}
+{{< /highlight >}}
+
+
##### 4.2.2.1 GroupByKey and unbounded PCollections {#groupbykey-and-unbounded-pcollections}
If you are using unbounded `PCollection`s, you must use either [non-global
@@ -1246,6 +1399,10 @@ data contains names and phone numbers.
{{< code_sample "sdks/go/examples/snippets/04transforms.go" cogroupbykey_inputs >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" cogroupbykey_inputs >}}
+{{< /highlight >}}
+
After `CoGroupByKey`, the resulting data contains all data associated with each
unique key from any of the input collections.
@@ -1262,7 +1419,11 @@ unique key from any of the input collections.
{{< code_sample "sdks/go/examples/snippets/04transforms_test.go" cogroupbykey_outputs >}}
{{< /highlight >}}
-{{< paragraph class="language-java language-py" >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" cogroupbykey_raw_outputs >}}
+{{< /highlight >}}
+
+{{< paragraph class="language-java language-py language-typescript" >}}
The following code example joins the two `PCollection`s with `CoGroupByKey`,
followed by a `ParDo` to consume the result. Then, the code uses tags to look up
and format data from each collection.
@@ -1287,6 +1448,10 @@ parameters maps to the ordering of the `CoGroupByKey` inputs.
{{< code_sample "sdks/go/examples/snippets/04transforms_test.go" cogroupbykey_outputs >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" cogroupbykey >}}
+{{< /highlight >}}
+
The formatted data looks like this:
{{< highlight java >}}
@@ -1301,11 +1466,16 @@ The formatted data looks like this:
{{< code_sample "sdks/go/examples/snippets/04transforms_test.go" cogroupbykey_formatted_outputs >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" cogroupbykey_formatted_outputs >}}
+{{< /highlight >}}
+
#### 4.2.4. Combine {#combine}
<span class="language-java">[`Combine`](https://beam.apache.org/releases/javadoc/{{< param release_latest >}}/index.html?org/apache/beam/sdk/transforms/Combine.html)</span>
<span class="language-py">[`Combine`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py)</span>
<span class="language-go">[`Combine`](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/combine.go#L27)</span>
+<span class="language-typescript">[`Combine`](https://github.com/apache/beam/blob/master/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts)</span>
is a Beam transform for combining collections of elements or values in your
data. `Combine` has variants that work on entire `PCollection`s, and some that
combine the values for each key in `PCollection`s of key/value pairs.
@@ -1341,6 +1511,13 @@ automatically apply some optimizations:
##### 4.2.4.1. Simple combinations using simple functions {#simple-combines}
The following example code shows a simple combine function.
+<span class="language-typescript">
+Combining is done by modifying a grouping transform with the `combining` method.
+This method takes three parameters: the value to combine (either as a named
+property of the input elements, or a function of the entire input),
+the combining operation (either a binary function or a `CombineFn`),
+and finally a name for the combined value in the output object.
+</span>
{{< highlight java >}}
// Sum a collection of Integer values. The function SumInts implements the interface SerializableFunction.
@@ -1384,6 +1561,10 @@ the type of the output.
{{< code_sample "sdks/go/examples/snippets/04transforms.go" combine_simple_sum >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" combine_simple_sum >}}
+{{< /highlight >}}
+
##### 4.2.4.2. Advanced combinations using CombineFn {#advanced-combines}
For more complex combine functions, you can define a
@@ -1463,6 +1644,10 @@ pc = ...
{{< code_sample "sdks/go/examples/snippets/04transforms.go" combine_custom_average >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" combine_custom_average >}}
+{{< /highlight >}}
+
<span class="language-go">
> **Note**: Only `MergeAccumulators` is a required method. The others will have a default interpretation
@@ -1498,6 +1683,10 @@ pc = ...
{{< code_sample "sdks/go/examples/snippets/04transforms.go" combine_global_average >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" combine_global_average >}}
+{{< /highlight >}}
+
##### 4.2.4.4. Combine and global windowing {#combine-global-windowing}
If your input `PCollection` uses the default global windowing, the default
@@ -1605,11 +1794,16 @@ playerAccuracies := ... // PCollection<string,int>
// avgAccuracyPerPlayer is a PCollection<string,float64>
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" combine_per_key >}}
+{{< /highlight >}}
+
#### 4.2.5. Flatten {#flatten}
<span class="language-java">[`Flatten`](https://beam.apache.org/releases/javadoc/{{< param release_latest >}}/index.html?org/apache/beam/sdk/transforms/Flatten.html)</span>
<span class="language-py">[`Flatten`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py)</span>
<span class="language-go">[`Flatten`](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/flatten.go)</span>
+<span class="language-typescript">`Flatten`</span>
is a Beam transform for `PCollection` objects that store the same data type.
`Flatten` merges multiple `PCollection` objects into a single logical
`PCollection`.
@@ -1641,6 +1835,12 @@ PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
{{< code_sample "sdks/go/examples/snippets/04transforms.go" model_multiple_pcollections_flatten >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+// Flatten takem an array of PCollection objects, wrapped in beam.P(...)
+// Returns a single PCollection that contains a union of all of the elements in all input PCollections.
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" model_multiple_pcollections_flatten >}}
+{{< /highlight >}}
+
##### 4.2.5.1. Data encoding in merged collections {#data-encoding-merged-collections}
By default, the coder for the output `PCollection` is the same as the coder for
@@ -1665,10 +1865,15 @@ pipeline is constructed.
<span class="language-java">[`Partition`](https://beam.apache.org/releases/javadoc/{{< param release_latest >}}/index.html?org/apache/beam/sdk/transforms/Partition.html)</span>
<span class="language-py">[`Partition`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py)</span>
<span class="language-go">[`Partition`](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/partition.go)</span>
+<span class="language-typescript">`Partition`</span>
is a Beam transform for `PCollection` objects that store the same data
type. `Partition` splits a single `PCollection` into a fixed number of smaller
collections.
+{{< paragraph class="language-typescript" >}}
+Often in the Typescript SDK the `Split` transform is more natural to use.
+{{< /paragraph >}}
+
`Partition` divides the elements of a `PCollection` according to a partitioning
function that you provide. The partitioning function contains the logic that
determines how to split up the elements of the input `PCollection` into each
@@ -1713,6 +1918,10 @@ students = ...
{{< code_sample "sdks/go/examples/snippets/04transforms.go" model_multiple_pcollections_partition >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" model_multiple_pcollections_partition >}}
+{{< /highlight >}}
+
### 4.3. Requirements for writing user code for Beam transforms {#requirements-for-writing-user-code-for-beam-transforms}
When you build user code for a Beam transform, you should keep in mind the
@@ -1765,6 +1974,20 @@ they are registered with `register.FunctionXxY` (for simple functions) or
`register.DoFnXxY` (for sturctural DoFns), and are not closures. Structural
`DoFn`s will have all exported fields serialized. Unexported fields are unable to
be serialized, and will be silently ignored.</span>
+<span class="language-typescript">
+The Typescript SDK use [ts-serialize-closures](https://github.com/nokia/ts-serialize-closures)
+to serialize functions (and other objects).
+This works out of the box for functions that are not closures, and also works
+for closures as long as the function in question (and any closures it references)
+are compiled with the
+[`ts-closure-transform` hooks](https://github.com/apache/beam/blob/master/sdks/typescript/tsconfig.json)
+(e.g. by using `ttsc` in place of `tsc`).
+One can alternatively call
+`requireForSerialization("importableModuleDefiningFunc", {func})`
+to [register a function directly](https://github.com/apache/beam/blob/master/sdks/typescript/src/apache_beam/serialization.ts) by name which can be less error-prone.
+Note that if, as is often the case in Javascript, `func` returns objects that
+contain closures, it is not sufficient to register `func` alone--its return
+value must be registered if used.</span>
Some other serializability factors you should keep in mind are:
@@ -1893,6 +2116,19 @@ words = ...
// on how to contribute them!
{{< /highlight >}}
+{{< highlight typescript >}}
+// Side inputs are provided by passing an extra context object to
+// `map`, `flatMap`, or `parDo` transforms. This object will get passed as an
+// extra argument to the provided function (or `process` method of the `DoFn`).
+// `SideInputParam` properties (generally created with `pardo.xxxSideInput(...)`)
+// have a `lookup` method that can be invoked from within the process method.
+
+// Let words be a PCollection of strings.
+const words : PCollection<string> = ...
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" model_pardo_side_input >}}
+{{< /highlight >}}
+
+
#### 4.4.2. Side inputs and windowing {#side-inputs-windowing}
A windowed `PCollection` may be infinite and thus cannot be compressed into a
@@ -1941,8 +2177,24 @@ function that matches the number of outputs. `beam.ParDo2` for two output `PColl
use `beam.ParDoN` which will return a `[]beam.PCollection`.
{{< /paragraph >}}
+{{< paragraph class="language-typescript" >}}
+While `ParDo` always produces a main output `PCollection` (as the return value
+from `apply`). If you want to have multiple outputs, emit an object with distinct
+properties in your `ParDo` operation and follow this operation with a `Split`
+to break it into multiple `PCollection`s.
+{{< /paragraph >}}
+
#### 4.5.1. Tags for multiple outputs {#output-tags}
+{{< paragraph class="language-typescript" >}}
+The `Split` PTransform will take a PCollection of elements of the form
+`{tagA?: A, tagB?: B, ...}` and return a object
+`{tagA: PCollection<A>, tagB: PCollection<B>, ...}`.
+The set of expected tags is passed to the operation; how multiple or
+unknown tags are handled can be specified by passing a non-default
+`SplitOptions` instance.
+{{< /paragraph >}}
+
{{< paragraph class="language-go" >}}
The Go SDK doesn't use output tags, and instead uses positional ordering for
multiple output PCollections.
@@ -2014,13 +2266,20 @@ multiple output PCollections.
{{< code_sample "sdks/go/examples/snippets/04transforms.go" model_multiple_output >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+# Create three PCollections from a single input PCollection.
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" model_multiple_output >}}
+{{< /highlight >}}
+
#### 4.5.2. Emitting to multiple outputs in your DoFn {#multiple-outputs-dofn}
{{< paragraph class="language-go" >}}
Call emitter functions as needed to produce 0 or more elements for its matching
`PCollection`. The same value can be emitted with multiple emitters.
As normal, do not mutate values after emitting them from any emitter.
+{{< /paragraph >}}
+{{< paragraph class="language-go" >}}
All emitters should be registered using a generic `register.EmitterX[...]`
function. This optimizes runtime execution of the emitter.
{{< /paragraph >}}
@@ -2071,6 +2330,10 @@ Other emitters output to their own PCollections in their defined parameter order
{{< code_sample "sdks/go/examples/snippets/04transforms.go" model_multiple_output_dofn >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" model_multiple_output_dofn >}}
+{{< /highlight >}}
+
#### 4.5.3. Accessing additional parameters in your DoFn {#other-dofn-parameters}
{{< paragraph class="language-java" >}}
@@ -2083,6 +2346,11 @@ In addition to the element, Beam will populate other parameters to your DoFn's `
Any combination of these parameters can be added to your process method in any order.
{{< /paragraph >}}
+{{< paragraph class="language-typescript" >}}
+In addition to the element, Beam will populate other parameters to your DoFn's `process` method.
+These are available by placing accessors in the context argument, just as for side inputs.
+{{< /paragraph >}}
+
{{< paragraph class="language-go" >}}
In addition to the element, Beam will populate other parameters to your DoFn's `ProcessElement` method.
Any combination of these parameters can be added to your process method in a standard order.
@@ -2113,6 +2381,11 @@ To access the timestamp of an input element, add a keyword parameter default to
To access the timestamp of an input element, add a `beam.EventTime` parameter before the element. For example:
{{< /paragraph >}}
+{{< paragraph class="language-typescript" >}}
+**Timestamp:**
+To access the window an input element falls into, add a `pardo.windowParam()` to the context argument.
+{{< /paragraph >}}
+
{{< highlight java >}}
.of(new DoFn<String, String>() {
public void processElement(@Element String word, @Timestamp Instant timestamp) {
@@ -2134,6 +2407,10 @@ class ProcessRecord(beam.DoFn):
func MyDoFn(ts beam.EventTime, word string) string { ... }
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" timestamp_param >}}
+{{< /highlight >}}
+
{{< paragraph class="language-java" >}}
**Window:**
To access the window an input element falls into, add a parameter of the type of the window used for the input `PCollection`.
@@ -2159,6 +2436,13 @@ Since `beam.Window` is an interface it's possible to type assert to the concrete
For example, when fixed windows are being used, the window is of type `window.IntervalWindow`.
{{< /paragraph >}}
+{{< paragraph class="language-typescript" >}}
+**Window:**
+To access the window an input element falls into, add a `pardo.windowParam()` to the context argument.
+If an element falls in multiple windows (for example, this will happen when using `SlidingWindows`), then the
+function will be invoked multiple time for the element, once for each window.
+{{< /paragraph >}}
+
{{< highlight java >}}
.of(new DoFn<String, String>() {
public void processElement(@Element String word, IntervalWindow window) {
@@ -2183,6 +2467,10 @@ func MyDoFn(w beam.Window, word string) string {
}
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" window_param >}}
+{{< /highlight >}}
+
{{< paragraph class="language-java" >}}
**PaneInfo:**
When triggers are used, Beam provides a `PaneInfo` object that contains information about the current firing. Using `PaneInfo`
@@ -2202,6 +2490,13 @@ When triggers are used, Beam provides `beam.PaneInfo` object that contains infor
you can determine whether this is an early or a late firing, and how many times this window has already fired for this key.
{{< /paragraph >}}
+{{< paragraph class="language-typescript" >}}
+**Window:**
+To access the window an input element falls into, add a `pardo.paneInfoParam()` to the context argument.
+Using `beam.PaneInfo` you can determine whether this is an early or a late firing,
+and how many times this window has already fired for this key.
+{{< /paragraph >}}
+
{{< highlight java >}}
.of(new DoFn<String, String>() {
public void processElement(@Element String word, PaneInfo paneInfo) {
@@ -2223,6 +2518,10 @@ class ProcessRecord(beam.DoFn):
{{< code_sample "sdks/go/examples/snippets/04transforms.go" model_paneinfo >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" pane_info_param >}}
+{{< /highlight >}}
+
{{< paragraph class="language-java" >}}
**PipelineOptions:**
The `PipelineOptions` for the current pipeline can always be accessed in a process method by adding it
@@ -2255,6 +2554,12 @@ Timers and States are explained in more detail in the
This feature isn't implemented in the Go SDK; see more at [Issue 20510](https://github.com/apache/beam/issues/20510). Once implemented, user defined Timer and State parameters can be used in a stateful DoFn.
{{< /paragraph >}}
+{{< paragraph class="language-typescript" >}}
+**Timer and State:**
+This feature isn't implemented in the Typescript SDK yet,
+but can be used from a cross-language transform.
+{{< /paragraph >}}
+
{{< highlight py >}}
class StatefulDoFn(beam.DoFn):
@@ -2378,11 +2683,20 @@ transform operations:
{{< code_sample "sdks/go/examples/snippets/04transforms.go" countwords_composite >}}
{{< /highlight >}}
+{{< highlight typescript >}}
+{{< code_sample "sdks/typescript/test/docs/programming_guide.ts" countwords_composite >}}
+{{< /highlight >}}
+
> **Note:** Because `Count` is itself a composite transform,
> `CountWords` is also a nested composite transform.
#### 4.6.2. Creating a composite transform {#composite-transform-creation}
+{{< paragraph class="language-typescript" >}}
+A PTransform in the Typescript SDK is simply a function that accepts and
+returns `PValue`s such as `PCollection`s.
+{{< /paragraph >}}
+
{{< paragraph class="language-java language-py" >}}
To create your own composite transform, create a subclass of the `PTransform`
class and override the `expand` method to specify the actual processing logic.
diff --git a/website/www/site/layouts/shortcodes/language-switcher.html b/website/www/site/layouts/shortcodes/language-switcher.html
index ec232d9e81d..a6bfe115fef 100644
--- a/website/www/site/layouts/shortcodes/language-switcher.html
+++ b/website/www/site/layouts/shortcodes/language-switcher.html
@@ -23,6 +23,9 @@
{{ if eq $lang "go" }}
<li data-type="language-go">Go SDK</li>
{{ end }}
+ {{ if eq $lang "typescript" }}
+ <li data-type="language-typescript">Typescript SDK</li>
+ {{ end }}
{{ end }}
</ul>
-</nav>
\ No newline at end of file
+</nav>