You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/05/19 16:46:16 UTC
[beam] branch master updated: Update the PTransform and associated APIs to be less class-based. (#17699)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6e5ca0c0fa4 Update the PTransform and associated APIs to be less class-based. (#17699)
6e5ca0c0fa4 is described below
commit 6e5ca0c0fa450a262fce5fb35dd7c66949e81401
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Thu May 19 10:46:10 2022 -0600
Update the PTransform and associated APIs to be less class-based. (#17699)
---
sdks/typescript/README.md | 6 +
sdks/typescript/src/apache_beam/coders/coders.ts | 28 ++-
.../src/apache_beam/examples/wordcount.ts | 19 +-
.../src/apache_beam/examples/wordcount_sql.ts | 10 +-
.../src/apache_beam/examples/wordcount_textio.ts | 19 +-
.../src/apache_beam/internal/pipeline.ts | 36 ++-
sdks/typescript/src/apache_beam/internal/urns.ts | 1 -
sdks/typescript/src/apache_beam/io/textio.ts | 22 +-
sdks/typescript/src/apache_beam/pvalue.ts | 104 +++++----
.../src/apache_beam/runners/direct_runner.ts | 12 +-
sdks/typescript/src/apache_beam/testing/assert.ts | 53 ++---
.../src/apache_beam/transforms/create.ts | 29 +--
.../src/apache_beam/transforms/external.ts | 23 +-
.../src/apache_beam/transforms/flatten.ts | 26 ++-
.../apache_beam/transforms/group_and_combine.ts | 161 +++++++-------
.../typescript/src/apache_beam/transforms/index.ts | 2 +-
.../src/apache_beam/transforms/internal.ts | 150 +++++++------
.../typescript/src/apache_beam/transforms/pardo.ts | 245 +++++++++------------
sdks/typescript/src/apache_beam/transforms/sql.ts | 35 ++-
.../src/apache_beam/transforms/transform.ts | 30 ++-
.../src/apache_beam/transforms/window.ts | 186 ++++++++--------
.../src/apache_beam/transforms/windowings.ts | 135 +++++-------
.../typescript/src/apache_beam/worker/operators.ts | 78 +++----
sdks/typescript/test/combine_test.ts | 56 +++--
sdks/typescript/test/primitives_test.ts | 88 ++++----
sdks/typescript/test/standard_coders_test.ts | 24 +-
sdks/typescript/test/wordcount.ts | 28 +--
27 files changed, 745 insertions(+), 861 deletions(-)
diff --git a/sdks/typescript/README.md b/sdks/typescript/README.md
index 7971bf1d563..745170bba65 100644
--- a/sdks/typescript/README.md
+++ b/sdks/typescript/README.md
@@ -93,6 +93,12 @@ used in Python. These can be "ordinary" javascript objects (which are passed
as is) or special DoFnParam objects which provide getters to element-specific
information (such as the current timestamp, window, or side input) at runtime.
+* Rather than introduce multiple-output complexity into the map/do operations
+themselves, producing multiple outputs is done by following with a new
+`Split` primitive that takes a
+`PCollection<{a?: AType, b: BType, ... }>` and produces an object
+`{a: PCollection<AType>, b: PCollection<BType>, ...}`.
+
* Javascript supports (and encourages) an asynchronous programing model, with
many libraries requiring use of the async/await paradigm.
As there is no way (by design) to go from the asyncronous style back to
diff --git a/sdks/typescript/src/apache_beam/coders/coders.ts b/sdks/typescript/src/apache_beam/coders/coders.ts
index ecb276a4bb6..3a9547fff34 100644
--- a/sdks/typescript/src/apache_beam/coders/coders.ts
+++ b/sdks/typescript/src/apache_beam/coders/coders.ts
@@ -38,16 +38,36 @@ interface Class<T> {
*/
class CoderRegistry {
internal_registry = {};
- get(urn: string): Class<Coder<unknown>> {
- const constructor: Class<Coder<unknown>> = this.internal_registry[urn];
+
+ getCoder(
+ urn: string,
+ payload: Uint8Array | undefined = undefined,
+ ...components: Coder<unknown>[]
+ ) {
+ const constructor: (...args) => Coder<unknown> =
+ this.internal_registry[urn];
if (constructor === undefined) {
throw new Error("Could not find coder for URN " + urn);
}
- return constructor;
+ if (payload && payload.length > 0) {
+ return constructor(payload, ...components);
+ } else {
+ return constructor(...components);
+ }
}
+ // TODO: Figure out how to branch on constructors (called with new) and
+ // ordinary functions.
register(urn: string, coderClass: Class<Coder<any>>) {
- this.internal_registry[urn] = coderClass;
+ this.registerClass(urn, coderClass);
+ }
+
+ registerClass(urn: string, coderClass: Class<Coder<any>>) {
+ this.registerConstructor(urn, (...args) => new coderClass(...args));
+ }
+
+ registerConstructor(urn: string, constructor: (...args) => Coder<any>) {
+ this.internal_registry[urn] = constructor;
}
}
diff --git a/sdks/typescript/src/apache_beam/examples/wordcount.ts b/sdks/typescript/src/apache_beam/examples/wordcount.ts
index 961afb43e9b..e34b8cab3c5 100644
--- a/sdks/typescript/src/apache_beam/examples/wordcount.ts
+++ b/sdks/typescript/src/apache_beam/examples/wordcount.ts
@@ -34,20 +34,7 @@ import * as yargs from "yargs";
import * as beam from "../../apache_beam";
import { createRunner } from "../runners/runner";
-
-import { count } from "../transforms/combiners";
-import { GroupBy } from "../transforms/group_and_combine";
-
-class CountElements extends beam.PTransform<
- beam.PCollection<any>,
- beam.PCollection<any>
-> {
- expand(input: beam.PCollection<any>) {
- return input
- .map((e) => ({ element: e }))
- .apply(new GroupBy("element").combining("element", count, "count"));
- }
-}
+import { countPerElement } from "../transforms/group_and_combine";
function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
return lines
@@ -55,13 +42,13 @@ function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
.flatMap(function* (line: string) {
yield* line.split(/[^a-z]+/);
})
- .apply(new CountElements("Count"));
+ .apply(countPerElement());
}
async function main() {
await createRunner(yargs.argv).run((root) => {
const lines = root.apply(
- new beam.Create([
+ beam.create([
"In the beginning God created the heaven and the earth.",
"And the earth was without form, and void; and darkness was upon the face of the deep.",
"And the Spirit of God moved upon the face of the waters.",
diff --git a/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts b/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts
index f27a0899710..c2453d64b93 100644
--- a/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts
+++ b/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts
@@ -24,9 +24,7 @@ import * as textio from "../io/textio";
import { PortableRunner } from "../runners/portable_runner/runner";
-import * as internal from "../../apache_beam/transforms/internal";
-import { RowCoder } from "../coders/row_coder";
-import { SqlTransform } from "../transforms/sql";
+import { sqlTransform } from "../transforms/sql";
async function main() {
// python apache_beam/runners/portability/local_job_service_main.py --port 3333
@@ -34,13 +32,13 @@ async function main() {
environmentType: "LOOPBACK",
jobEndpoint: "localhost:3333",
}).run(async (root) => {
- const lines = root.apply(new beam.Create(["a", "b", "c", "c"]));
+ const lines = root.apply(beam.create(["a", "b", "c", "c"]));
const filtered = await lines
.map((w) => ({ word: w }))
- .apply(new internal.WithCoderInternal(RowCoder.fromJSON({ word: "str" })))
+ .apply(beam.withRowCoder({ word: "str" }))
.asyncApply(
- new SqlTransform(
+ sqlTransform(
"SELECT word, count(*) as c from PCOLLECTION group by word"
)
);
diff --git a/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts b/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts
index 03f7149e285..686be3b0594 100644
--- a/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts
+++ b/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts
@@ -21,22 +21,9 @@
import * as beam from "../../apache_beam";
import * as textio from "../io/textio";
import { DirectRunner } from "../runners/direct_runner";
-
-import { count } from "../transforms/combiners";
-import { GroupBy } from "../transforms/group_and_combine";
-
import { PortableRunner } from "../runners/portable_runner/runner";
-class CountElements extends beam.PTransform<
- beam.PCollection<any>,
- beam.PCollection<any>
-> {
- expand(input: beam.PCollection<any>) {
- return input
- .map((e) => ({ element: e }))
- .apply(new GroupBy("element").combining("element", count, "count"));
- }
-}
+import { countPerElement } from "../transforms/group_and_combine";
function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
return lines
@@ -44,14 +31,14 @@ function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
.flatMap(function* (line: string) {
yield* line.split(/[^a-z]+/);
})
- .apply(new CountElements("Count"));
+ .apply(countPerElement());
}
async function main() {
// python apache_beam/runners/portability/local_job_service_main.py --port 3333
await new PortableRunner("localhost:3333").run(async (root) => {
const lines = await root.asyncApply(
- new textio.ReadFromText("gs://dataflow-samples/shakespeare/kinglear.txt")
+ textio.readFromText("gs://dataflow-samples/shakespeare/kinglear.txt")
);
lines.apply(wordCount).map(console.log);
diff --git a/sdks/typescript/src/apache_beam/internal/pipeline.ts b/sdks/typescript/src/apache_beam/internal/pipeline.ts
index 4b9f6a7b52a..f333bb6c99b 100644
--- a/sdks/typescript/src/apache_beam/internal/pipeline.ts
+++ b/sdks/typescript/src/apache_beam/internal/pipeline.ts
@@ -21,13 +21,13 @@ import equal from "fast-deep-equal";
import * as runnerApi from "../proto/beam_runner_api";
import * as fnApi from "../proto/beam_fn_api";
import {
- PTransform,
- AsyncPTransform,
+ PTransformClass,
+ AsyncPTransformClass,
extractName,
} from "../transforms/transform";
-import { GlobalWindows } from "../transforms/windowings";
+import { globalWindows } from "../transforms/windowings";
import * as pvalue from "../pvalue";
-import { WindowInto } from "../transforms/window";
+import { createWindowingStrategyProto } from "../transforms/window";
import * as environments from "./environments";
import { Coder, globalRegistry as globalCoderRegistry } from "../coders/coders";
@@ -45,18 +45,14 @@ export class PipelineContext {
const this_ = this;
if (this.coders[coderId] == undefined) {
const coderProto = this.components.coders[coderId];
- const coderConstructor = globalCoderRegistry().get(coderProto.spec!.urn);
- const components = (coderProto.componentCoderIds || []).map(
- this_.getCoder.bind(this_)
+ const components: Coder<unknown>[] = (
+ coderProto.componentCoderIds || []
+ ).map(this_.getCoder.bind(this_));
+ this.coders[coderId] = globalCoderRegistry().getCoder(
+ coderProto.spec!.urn,
+ coderProto.spec!.payload,
+ ...components
);
- if (coderProto.spec!.payload?.length) {
- this.coders[coderId] = new coderConstructor(
- coderProto.spec!.payload,
- ...components
- );
- } else {
- this.coders[coderId] = new coderConstructor(...components);
- }
}
return this.coders[coderId];
}
@@ -132,13 +128,13 @@ export class Pipeline {
environments.defaultJsEnvironment();
this.context = new PipelineContext(this.proto.components!);
this.proto.components!.windowingStrategies[this.globalWindowing] =
- WindowInto.createWindowingStrategy(this, new GlobalWindows());
+ createWindowingStrategyProto(this, globalWindows());
}
preApplyTransform<
InputT extends pvalue.PValue<any>,
OutputT extends pvalue.PValue<any>
- >(transform: AsyncPTransform<InputT, OutputT>, input: InputT) {
+ >(transform: AsyncPTransformClass<InputT, OutputT>, input: InputT) {
const this_ = this;
const transformId = this.context.createUniqueName("transform");
let parent: runnerApi.PTransform | undefined = undefined;
@@ -168,7 +164,7 @@ export class Pipeline {
applyTransform<
InputT extends pvalue.PValue<any>,
OutputT extends pvalue.PValue<any>
- >(transform: PTransform<InputT, OutputT>, input: InputT) {
+ >(transform: PTransformClass<InputT, OutputT>, input: InputT) {
const { id: transformId, proto: transformProto } = this.preApplyTransform(
transform,
input
@@ -186,7 +182,7 @@ export class Pipeline {
async asyncApplyTransform<
InputT extends pvalue.PValue<any>,
OutputT extends pvalue.PValue<any>
- >(transform: AsyncPTransform<InputT, OutputT>, input: InputT) {
+ >(transform: AsyncPTransformClass<InputT, OutputT>, input: InputT) {
const { id: transformId, proto: transformProto } = this.preApplyTransform(
transform,
input
@@ -205,7 +201,7 @@ export class Pipeline {
InputT extends pvalue.PValue<any>,
OutputT extends pvalue.PValue<any>
>(
- transform: AsyncPTransform<InputT, OutputT>,
+ transform: AsyncPTransformClass<InputT, OutputT>,
transformProto: runnerApi.PTransform,
result: OutputT
) {
diff --git a/sdks/typescript/src/apache_beam/internal/urns.ts b/sdks/typescript/src/apache_beam/internal/urns.ts
index 4121444a308..e08ac5803bf 100644
--- a/sdks/typescript/src/apache_beam/internal/urns.ts
+++ b/sdks/typescript/src/apache_beam/internal/urns.ts
@@ -24,7 +24,6 @@ export const IDENTITY_DOFN_URN = "beam:dofn:identity:0.1";
export const SERIALIZED_JS_DOFN_INFO = "beam:dofn:serialized_js_dofn_info:v1";
export const SPLITTING_JS_DOFN_URN = "beam:dofn:splitting_dofn:v1";
-export const SPLITTING2_JS_DOFN_URN = "beam:dofn:splitting2_dofn:v1";
export const JS_WINDOW_INTO_DOFN_URN = "beam:dofn:js_window_into:v1";
export const JS_ASSIGN_TIMESTAMPS_DOFN_URN =
"beam:dofn:js_assign_timestamps:v1";
diff --git a/sdks/typescript/src/apache_beam/io/textio.ts b/sdks/typescript/src/apache_beam/io/textio.ts
index adf487d3f22..58cc5f241b4 100644
--- a/sdks/typescript/src/apache_beam/io/textio.ts
+++ b/sdks/typescript/src/apache_beam/io/textio.ts
@@ -19,28 +19,20 @@
import * as beam from "../../apache_beam";
import * as external from "../transforms/external";
-export class ReadFromText extends beam.AsyncPTransform<
- beam.Root,
- beam.PCollection<string>
-> {
- constructor(private filePattern: string) {
- super();
- }
-
- async asyncExpand(root: beam.Root) {
+export function readFromText(
+ filePattern: string
+): beam.AsyncPTransform<beam.Root, beam.PCollection<string>> {
+ return async function readFromText(root: beam.Root) {
return await root.asyncApply(
- new external.RawExternalTransform<
- beam.PValue<any>,
- beam.PCollection<any>
- >(
+ external.rawExternalTransform<beam.Root, beam.PCollection<any>>(
"beam:transforms:python:fully_qualified_named",
{
constructor: "apache_beam.io.ReadFromText",
- kwargs: { file_pattern: this.filePattern },
+ kwargs: { file_pattern: filePattern },
},
// python apache_beam/runners/portability/expansion_service_main.py --fully_qualified_name_glob='*' --port 4444 --environment_type='beam:env:embedded_python:v1'
"localhost:4444"
)
);
- }
+ };
}
diff --git a/sdks/typescript/src/apache_beam/pvalue.ts b/sdks/typescript/src/apache_beam/pvalue.ts
index bf4dd4ea9fd..1313f2241e7 100644
--- a/sdks/typescript/src/apache_beam/pvalue.ts
+++ b/sdks/typescript/src/apache_beam/pvalue.ts
@@ -21,10 +21,12 @@ import { Pipeline } from "./internal/pipeline";
import {
PTransform,
AsyncPTransform,
+ PTransformClass,
+ AsyncPTransformClass,
extractName,
withName,
} from "./transforms/transform";
-import { ParDo, DoFn } from "./transforms/pardo";
+import { parDo, DoFn } from "./transforms/pardo";
import * as runnerApi from "./proto/beam_runner_api";
/**
@@ -38,20 +40,18 @@ export class Root {
this.pipeline = pipeline;
}
- apply<OutputT extends PValue<any>>(
- transform: PTransform<Root, OutputT> | ((Root) => OutputT)
- ) {
- if (!(transform instanceof PTransform)) {
- transform = new PTransformFromCallable(transform);
+ apply<OutputT extends PValue<any>>(transform: PTransform<Root, OutputT>) {
+ if (!(transform instanceof PTransformClass)) {
+ transform = new PTransformClassFromCallable(transform);
}
return this.pipeline.applyTransform(transform, this);
}
async asyncApply<OutputT extends PValue<any>>(
- transform: AsyncPTransform<Root, OutputT> | ((Root) => Promise<OutputT>)
+ transform: AsyncPTransform<Root, OutputT>
) {
- if (!(transform instanceof AsyncPTransform)) {
- transform = new AsyncPTransformFromCallable(transform);
+ if (!(transform instanceof AsyncPTransformClass)) {
+ transform = new AsyncPTransformClassFromCallable(transform);
}
return await this.pipeline.asyncApplyTransform(transform, this);
}
@@ -83,21 +83,19 @@ export class PCollection<T> {
}
apply<OutputT extends PValue<any>>(
- transform: PTransform<PCollection<T>, OutputT> | ((PCollection) => OutputT)
+ transform: PTransform<PCollection<T>, OutputT>
) {
- if (!(transform instanceof PTransform)) {
- transform = new PTransformFromCallable(transform);
+ if (!(transform instanceof PTransformClass)) {
+ transform = new PTransformClassFromCallable(transform);
}
return this.pipeline.applyTransform(transform, this);
}
asyncApply<OutputT extends PValue<any>>(
- transform:
- | AsyncPTransform<PCollection<T>, OutputT>
- | ((PCollection) => Promise<OutputT>)
+ transform: AsyncPTransform<PCollection<T>, OutputT>
) {
- if (!(transform instanceof AsyncPTransform)) {
- transform = new AsyncPTransformFromCallable(transform);
+ if (!(transform instanceof AsyncPTransformClass)) {
+ transform = new AsyncPTransformClassFromCallable(transform);
}
return this.pipeline.asyncApplyTransform(transform, this);
}
@@ -111,7 +109,7 @@ export class PCollection<T> {
return this.apply(
withName(
"map(" + extractName(fn) + ")",
- new ParDo<T, OutputT, ContextT>(
+ parDo<T, OutputT, ContextT>(
{
process: function* (element: T, context: ContextT) {
// While it's legal to call a function with extra arguments which will
@@ -136,7 +134,7 @@ export class PCollection<T> {
return this.apply(
withName(
"flatMap(" + extractName(fn) + ")",
- new ParDo<T, OutputT, ContextT>(
+ parDo<T, OutputT, ContextT>(
{
process: function (element: T, context: ContextT) {
// While it's legal to call a function with extra arguments which will
@@ -158,7 +156,7 @@ export class PCollection<T> {
}
/**
- * The type of object that may be consumed or produced by a PTransform.
+ * The type of object that may be consumed or produced by a PTransformClass.
*/
export type PValue<T> =
| void
@@ -209,7 +207,7 @@ export function flattenPValue<T>(
* Wraps a PValue in a single object such that a transform can be applied to it.
*
* For example, Flatten takes a PCollection[] as input, but Array has no
- * apply(PTransform) method, so one writes
+ * apply(PTransformClass) method, so one writes
*
* P([pcA, pcB, pcC]).apply(new Flatten())
*/
@@ -221,21 +219,21 @@ class PValueWrapper<T extends PValue<any>> {
constructor(private pvalue: T) {}
apply<O extends PValue<any>>(
- transform: PTransform<T, O> | ((input: T) => O),
+ transform: PTransform<T, O>,
root: Root | null = null
) {
- if (!(transform instanceof PTransform)) {
- transform = new PTransformFromCallable(transform);
+ if (!(transform instanceof PTransformClass)) {
+ transform = new PTransformClassFromCallable(transform);
}
return this.pipeline(root).applyTransform(transform, this.pvalue);
}
async asyncApply<O extends PValue<any>>(
- transform: AsyncPTransform<T, O> | ((input: T) => Promise<O>),
+ transform: AsyncPTransform<T, O>,
root: Root | null = null
) {
- if (!(transform instanceof AsyncPTransform)) {
- transform = new AsyncPTransformFromCallable(transform);
+ if (!(transform instanceof AsyncPTransformClass)) {
+ transform = new AsyncPTransformClassFromCallable(transform);
}
return await this.pipeline(root).asyncApplyTransform(
transform,
@@ -253,35 +251,63 @@ class PValueWrapper<T extends PValue<any>> {
}
}
-class PTransformFromCallable<
+class PTransformClassFromCallable<
InputT extends PValue<any>,
OutputT extends PValue<any>
-> extends PTransform<InputT, OutputT> {
- expander: (InputT) => OutputT;
+> extends PTransformClass<InputT, OutputT> {
+ expander: (
+ input: InputT,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) => OutputT;
- constructor(expander: (InputT) => OutputT) {
+ constructor(
+ expander: (
+ input: InputT,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) => OutputT
+ ) {
super(extractName(expander));
this.expander = expander;
}
- expand(input: InputT) {
- return this.expander(input);
+ expandInternal(
+ input: InputT,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) {
+ return this.expander(input, pipeline, transformProto);
}
}
-class AsyncPTransformFromCallable<
+class AsyncPTransformClassFromCallable<
InputT extends PValue<any>,
OutputT extends PValue<any>
-> extends AsyncPTransform<InputT, OutputT> {
- expander: (InputT) => Promise<OutputT>;
+> extends AsyncPTransformClass<InputT, OutputT> {
+ expander: (
+ input: InputT,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) => Promise<OutputT>;
- constructor(expander: (InputT) => Promise<OutputT>) {
+ constructor(
+ expander: (
+ input: InputT,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) => Promise<OutputT>
+ ) {
super(extractName(expander));
this.expander = expander;
}
- async asyncExpand(input: InputT) {
- return this.expander(input);
+ async asyncExpandInternal(
+ input: InputT,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) {
+ return this.expander(input, pipeline, transformProto);
}
}
diff --git a/sdks/typescript/src/apache_beam/runners/direct_runner.ts b/sdks/typescript/src/apache_beam/runners/direct_runner.ts
index ff203d06c41..1482995eee5 100644
--- a/sdks/typescript/src/apache_beam/runners/direct_runner.ts
+++ b/sdks/typescript/src/apache_beam/runners/direct_runner.ts
@@ -28,13 +28,13 @@ import { JobState_Enum } from "../proto/beam_job_api";
import { Pipeline } from "../internal/pipeline";
import { Root } from "../pvalue";
-import { Impulse, GroupByKey } from "../transforms/internal";
+import { impulse, groupByKey } from "../transforms/internal";
import { Runner, PipelineResult } from "./runner";
import * as worker from "../worker/worker";
import * as operators from "../worker/operators";
import { createStateKey } from "../worker/pardo_context";
import * as state from "../worker/state";
-import { ParDo } from "../transforms/pardo";
+import { parDo } from "../transforms/pardo";
import {
Window,
GlobalWindow,
@@ -121,7 +121,7 @@ export class DirectRunner extends Runner {
descriptor,
null!,
new state.CachingStateProvider(stateProvider),
- [Impulse.urn]
+ [impulse.urn]
);
await processor.process("bundle_id");
@@ -165,7 +165,7 @@ class DirectImpulseOperator implements operators.IOperator {
async finishBundle() {}
}
-operators.registerOperator(Impulse.urn, DirectImpulseOperator);
+operators.registerOperator(impulse.urn, DirectImpulseOperator);
// Only to be used in direct runner, as this will only group within a single bundle.
// TODO: (Extension) This could be used as a base for the PGBKOperation operator,
@@ -248,7 +248,7 @@ class DirectGbkOperator implements operators.IOperator {
}
}
-operators.registerOperator(GroupByKey.urn, DirectGbkOperator);
+operators.registerOperator(groupByKey.urn, DirectGbkOperator);
/**
* Rewrites the pipeline to be suitable for running as a single "bundle."
@@ -305,7 +305,7 @@ function rewriteSideInputs(p: runnerApi.Pipeline, pipelineStateRef: string) {
for (const [transformId, transform] of Object.entries(transforms)) {
if (
transform.spec != undefined &&
- transform.spec.urn == ParDo.urn &&
+ transform.spec.urn == parDo.urn &&
Object.keys(transform.inputs).length > 1
) {
const spec = runnerApi.ParDoPayload.fromBinary(transform.spec!.payload);
diff --git a/sdks/typescript/src/apache_beam/testing/assert.ts b/sdks/typescript/src/apache_beam/testing/assert.ts
index 0d6451c5b45..930ffca6931 100644
--- a/sdks/typescript/src/apache_beam/testing/assert.ts
+++ b/sdks/typescript/src/apache_beam/testing/assert.ts
@@ -17,7 +17,7 @@
*/
import * as beam from "../../apache_beam";
-import { GlobalWindows } from "../../apache_beam/transforms/windowings";
+import { globalWindows } from "../../apache_beam/transforms/windowings";
import * as internal from "../transforms/internal";
import * as assert from "assert";
@@ -28,55 +28,39 @@ function callAssertDeepEqual(a, b) {
}
// TODO: (Naming)
-export class AssertDeepEqual extends beam.PTransform<
- beam.PCollection<any>,
- void
-> {
- expected: any[];
-
- constructor(expected: any[]) {
- super("AssertDeepEqual");
- this.expected = expected;
- }
-
- expand(pcoll: beam.PCollection<any>) {
- const expected = this.expected;
+export function assertDeepEqual<T>(
+ expected: T[]
+): beam.PTransform<beam.PCollection<T>, void> {
+ return function assertDeepEqual(pcoll: beam.PCollection<T>) {
pcoll.apply(
- new Assert("Assert", (actual) => {
- const actualArray: any[] = [...actual];
+ assertContentsSatisfies((actual: T[]) => {
+ const actualArray: T[] = [...actual];
expected.sort();
actualArray.sort();
callAssertDeepEqual(actualArray, expected);
})
);
- }
+ };
}
-export class Assert extends beam.PTransform<beam.PCollection<any>, void> {
- check: (actual: any[]) => void;
-
- constructor(name: string, check: (actual: any[]) => void) {
- super(name);
- this.check = check;
- }
-
- expand(pcoll: beam.PCollection<any>) {
- const check = this.check;
+export function assertContentsSatisfies<T>(
+ check: (actual: T[]) => void
+): beam.PTransform<beam.PCollection<T>, void> {
+ function expand(pcoll: beam.PCollection<T>) {
// We provide some value here to ensure there is at least one element
// so the DoFn gets invoked.
const singleton = pcoll
.root()
- .apply(new beam.Impulse())
+ .apply(beam.impulse())
.map((_) => ({ tag: "expected" }));
// CoGBK.
const tagged = pcoll
.map((e) => ({ tag: "actual", value: e }))
- .apply(new beam.WindowInto(new GlobalWindows()));
+ .apply(beam.windowInto(globalWindows()));
beam
.P([singleton, tagged])
- .apply(new beam.Flatten())
- .map((e) => ({ key: 0, value: e }))
- .apply(new internal.GroupByKey()) // TODO: GroupBy.
+ .apply(beam.flatten())
+ .apply(beam.groupBy((e) => 0))
.map(
beam.withName("extractActual", (kv) => {
const actual: any[] =
@@ -86,6 +70,11 @@ export class Assert extends beam.PTransform<beam.PCollection<any>, void> {
})
);
}
+
+ return beam.withName(
+ `assertContentsSatisfies(${beam.extractName(check)})`,
+ expand
+ );
}
import { requireForSerialization } from "../serialization";
diff --git a/sdks/typescript/src/apache_beam/transforms/create.ts b/sdks/typescript/src/apache_beam/transforms/create.ts
index e3413d3efb3..bf5eefa673a 100644
--- a/sdks/typescript/src/apache_beam/transforms/create.ts
+++ b/sdks/typescript/src/apache_beam/transforms/create.ts
@@ -17,34 +17,19 @@
*/
import { PTransform, withName } from "./transform";
-import { Impulse } from "./internal";
+import { impulse } from "./internal";
import { Root, PCollection } from "../pvalue";
/**
* A Ptransform that represents a 'static' source with a list of elements passed at construction time. It
* returns a PCollection that contains the elements in the input list.
- *
- * @extends PTransform
*/
-export class Create<T> extends PTransform<Root, PCollection<T>> {
- elements: T[];
-
- /**
- * Construct a new Create PTransform.
- * @param elements - the list of elements in the PCollection
- */
- constructor(elements: T[]) {
- super("Create");
- this.elements = elements;
+export function create<T>(elements: T[]): PTransform<Root, PCollection<T>> {
+ function create(root: Root): PCollection<T> {
+ return root
+ .apply(impulse())
+ .flatMap(withName("ExtractElements", (_) => elements));
}
- expand(root: Root) {
- const elements = this.elements;
- // TODO: (Cleanup) Store encoded values and conditionally shuffle.
- return root.apply(new Impulse()).flatMap(
- withName("ExtractElements", function* blarg(_) {
- yield* elements;
- })
- );
- }
+ return create;
}
diff --git a/sdks/typescript/src/apache_beam/transforms/external.ts b/sdks/typescript/src/apache_beam/transforms/external.ts
index 73ba81faca7..bfb3ebe4182 100644
--- a/sdks/typescript/src/apache_beam/transforms/external.ts
+++ b/sdks/typescript/src/apache_beam/transforms/external.ts
@@ -49,10 +49,27 @@ import * as service from "../utils/service";
// TODO: (API) (Types) This class expects PCollections to already have the
// correct Coders. It would be great if we could infer coders, or at least have
// a cleaner way to specify them than using internal.WithCoderInternal.
-export class RawExternalTransform<
+export function rawExternalTransform<
InputT extends PValue<any>,
OutputT extends PValue<any>
-> extends transform.AsyncPTransform<InputT, OutputT> {
+>(
+ urn: string,
+ payload: Uint8Array | { [key: string]: any },
+ serviceProviderOrAddress: string | (() => Promise<service.Service>),
+ inferPValueType: boolean = true
+): transform.AsyncPTransform<InputT, OutputT> {
+ return new RawExternalTransform(
+ urn,
+ payload,
+ serviceProviderOrAddress,
+ inferPValueType
+ );
+}
+
+class RawExternalTransform<
+ InputT extends PValue<any>,
+ OutputT extends PValue<any>
+> extends transform.AsyncPTransformClass<InputT, OutputT> {
static namespaceCounter = 0;
static freshNamespace() {
return "namespace_" + RawExternalTransform.namespaceCounter++ + "_";
@@ -110,7 +127,7 @@ export class RawExternalTransform<
request.components!.transforms[fakeImpulseNamespace + pcId] =
runnerApi.PTransform.create({
uniqueName: fakeImpulseNamespace + "_create_" + pcId,
- spec: { urn: internal.Impulse.urn, payload: new Uint8Array() },
+ spec: { urn: internal.impulse.urn, payload: new Uint8Array() },
outputs: { main: pcId },
});
}
diff --git a/sdks/typescript/src/apache_beam/transforms/flatten.ts b/sdks/typescript/src/apache_beam/transforms/flatten.ts
index 80e80a98d19..57d29c236a5 100644
--- a/sdks/typescript/src/apache_beam/transforms/flatten.ts
+++ b/sdks/typescript/src/apache_beam/transforms/flatten.ts
@@ -17,28 +17,32 @@
*/
import * as runnerApi from "../proto/beam_runner_api";
-import { PTransform } from "./transform";
+import { PTransform, withName } from "./transform";
import { PCollection } from "../pvalue";
import { Pipeline } from "../internal/pipeline";
import { GeneralObjectCoder } from "../coders/js_coders";
-export class Flatten<T> extends PTransform<PCollection<T>[], PCollection<T>> {
- // static urn: string = runnerApi.StandardPTransforms_Primitives.GROUP_BY_KEY.urn;
- // TODO: (Cleanup) use above line, not below line.
- static urn: string = "beam:transform:flatten:v1";
- name = "Flatten";
-
- expandInternal(
+export function flatten<T>(): PTransform<PCollection<T>[], PCollection<T>> {
+ function expandInternal(
inputs: PCollection<T>[],
pipeline: Pipeline,
transformProto: runnerApi.PTransform
) {
transformProto.spec = runnerApi.FunctionSpec.create({
- urn: Flatten.urn,
+ urn: flatten.urn,
payload: null!,
});
- // TODO: Input coder if they're all the same? UnionCoder?
- return pipeline.createPCollectionInternal<T>(new GeneralObjectCoder());
+ // TODO: UnionCoder if they're not the same?
+ const coders = new Set(
+ inputs.map((pc) => pipeline.context.getPCollectionCoderId(pc))
+ );
+ const coder =
+ coders.size == 1 ? [...coders][0] : new GeneralObjectCoder<T>();
+ return pipeline.createPCollectionInternal<T>(coder);
}
+
+ return withName("flatten", expandInternal);
}
+
+flatten.urn = "beam:transform:flatten:v1";
diff --git a/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts b/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts
index 6e7072cc74c..4f7de5a0d47 100644
--- a/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts
+++ b/sdks/typescript/src/apache_beam/transforms/group_and_combine.ts
@@ -17,8 +17,9 @@
*/
import { KV } from "../values";
-import { PTransform } from "./transform";
+import { PTransform, PTransformClass, withName } from "./transform";
import { PCollection } from "../pvalue";
+import { PValue } from "../pvalue";
import * as internal from "./internal";
import { count } from "./combiners";
@@ -40,13 +41,13 @@ export interface CombineFn<I, A, O> {
type Combiner<I> = CombineFn<I, any, any> | ((a: any, b: any) => any);
/**
- * A PTransform that takes a PCollection of elements, and returns a PCollection
+ * A PTransformClass that takes a PCollection of elements, and returns a PCollection
* of elements grouped by a field, multiple fields, an expression that is used
* as the grouping key.
*
- * @extends PTransform
+ * @extends PTransformClass
*/
-export class GroupBy<T, K> extends PTransform<
+export class GroupBy<T, K> extends PTransformClass<
PCollection<T>,
PCollection<KV<K, Iterable<T>>>
> {
@@ -72,7 +73,7 @@ export class GroupBy<T, K> extends PTransform<
const keyFn = this.keyFn;
return input
.map((x) => ({ key: keyFn(x), value: x }))
- .apply(new internal.GroupByKey());
+ .apply(internal.groupByKey());
}
combining<I>(
@@ -88,6 +89,13 @@ export class GroupBy<T, K> extends PTransform<
}
}
+export function groupBy<T, K>(
+ key: string | string[] | ((element: T) => K),
+ keyName: string | undefined = undefined
+): GroupBy<T, K> {
+ return new GroupBy<T, K>(key, keyName);
+}
+
/**
* Groups all elements of the input PCollection together.
*
@@ -95,7 +103,7 @@ export class GroupBy<T, K> extends PTransform<
* loses parallelization benefits in bringing all elements of a distributed
* PCollection together on a single machine.
*/
-export class GroupGlobally<T> extends PTransform<
+export class GroupGlobally<T> extends PTransformClass<
PCollection<T>,
PCollection<Iterable<T>>
> {
@@ -120,7 +128,11 @@ export class GroupGlobally<T> extends PTransform<
}
}
-class GroupByAndCombine<T, O> extends PTransform<
+export function groupGlobally<T>() {
+ return new GroupGlobally<T>();
+}
+
+class GroupByAndCombine<T, O> extends PTransformClass<
PCollection<T>,
PCollection<O>
> {
@@ -167,8 +179,8 @@ class GroupByAndCombine<T, O> extends PTransform<
};
})
.apply(
- new internal.CombinePerKey(
- new MultiCombineFn(this_.combiners.map((c) => c.combineFn))
+ internal.combinePerKey(
+ multiCombineFn(this_.combiners.map((c) => c.combineFn))
)
)
.map(function constructResult(kv) {
@@ -190,34 +202,30 @@ class GroupByAndCombine<T, O> extends PTransform<
}
}
-// TODO: (API) Does this carry its weight as a top-level built-in function?
-// Cons: It's just a combine. Pros: It's kind of a non-obvious one.
-// NOTE: The encoded form of the elements will be used for equality checking.
-export class CountPerElement<T> extends PTransform<
+export function countPerElement<T>(): PTransform<
PCollection<T>,
PCollection<{ element: T; count: number }>
> {
- expand(input) {
- return input.apply(
- new GroupBy((e) => e, "element").combining((e) => e, count, "count")
- );
- }
+ return withName(
+ "countPerElement",
+ groupBy((e) => e, "element").combining((e) => e, count, "count")
+ );
}
-export class CountGlobally<T> extends PTransform<
+export function countGlobally<T>(): PTransform<
PCollection<T>,
PCollection<number>
> {
- expand(input) {
- return input
+ return withName("countGlobally", (input) =>
+ input
.apply(new GroupGlobally().combining((e) => e, count, "count"))
- .map((o) => o.count);
- }
+ .map((o) => o.count)
+ );
}
function toCombineFn<I>(combiner: Combiner<I>): CombineFn<I, any, any> {
if (typeof combiner == "function") {
- return new BinaryCombineFn<I>(combiner);
+ return binaryCombineFn<I>(combiner);
} else {
return combiner;
}
@@ -229,71 +237,62 @@ interface CombineSpec<T, I, O> {
resultName: string;
}
-class BinaryCombineFn<I> implements CombineFn<I, I | undefined, I> {
- constructor(private combiner: (a: I, b: I) => I) {}
- createAccumulator() {
- return undefined;
- }
- addInput(a, b) {
- if (a == undefined) {
- return b;
- } else {
- return this.combiner(a, b);
- }
- }
- mergeAccumulators(accs) {
- return accs.filter((a) => a != undefined).reduce(this.combiner, undefined);
- }
- extractOutput(a) {
- return a;
- }
+function binaryCombineFn<I>(
+ combiner: (a: I, b: I) => I
+): CombineFn<I, I | undefined, I> {
+ return {
+ createAccumulator: () => undefined,
+ addInput: (a, b) => (a === undefined ? b : combiner(a, b)),
+ mergeAccumulators: (accs) =>
+ [...accs].filter((a) => a != undefined).reduce(combiner, undefined),
+ extractOutput: (a) => a,
+ };
}
-class MultiCombineFn implements CombineFn<any[], any[], any[]> {
- batchSize: number = 100;
- // TODO: (Typescript) Is there a way to indicate type parameters match the above?
- constructor(private combineFns: CombineFn<any, any, any>[]) {}
+// TODO: (Typescript) Is there a way to indicate type parameters match the above?
+function multiCombineFn(
+ combineFns: CombineFn<any, any, any>[],
+ batchSize: number = 100
+): CombineFn<any[], any[], any[]> {
+ return {
+ createAccumulator: () => combineFns.map((fn) => fn.createAccumulator()),
- createAccumulator() {
- return this.combineFns.map((fn) => fn.createAccumulator());
- }
-
- addInput(accumulators: any[], inputs: any[]) {
- // TODO: (Cleanup) Does javascript have a clean zip?
- let result: any[] = [];
- for (let i = 0; i < this.combineFns.length; i++) {
- result.push(this.combineFns[i].addInput(accumulators[i], inputs[i]));
- }
- return result;
- }
+ addInput: (accumulators: any[], inputs: any[]) => {
+ // TODO: (Cleanup) Does javascript have a clean zip?
+ let result: any[] = [];
+ for (let i = 0; i < combineFns.length; i++) {
+ result.push(combineFns[i].addInput(accumulators[i], inputs[i]));
+ }
+ return result;
+ },
- mergeAccumulators(accumulators: Iterable<any[]>) {
- const combineFns = this.combineFns;
- let batches = combineFns.map((fn) => [fn.createAccumulator()]);
- for (let acc of accumulators) {
+ mergeAccumulators: (accumulators: Iterable<any[]>) => {
+ let batches = combineFns.map((fn) => [fn.createAccumulator()]);
+ for (let acc of accumulators) {
+ for (let i = 0; i < combineFns.length; i++) {
+ batches[i].push(acc[i]);
+ if (batches[i].length > batchSize) {
+ batches[i] = [combineFns[i].mergeAccumulators(batches[i])];
+ }
+ }
+ }
for (let i = 0; i < combineFns.length; i++) {
- batches[i].push(acc[i]);
- if (batches[i].length > this.batchSize) {
+ if (batches[i].length > 1) {
batches[i] = [combineFns[i].mergeAccumulators(batches[i])];
}
}
- }
- for (let i = 0; i < combineFns.length; i++) {
- if (batches[i].length > 1) {
- batches[i] = [combineFns[i].mergeAccumulators(batches[i])];
- }
- }
- return batches.map((batch) => batch[0]);
- }
+ return batches.map((batch) => batch[0]);
+ },
- extractOutput(accumulators: any[]) {
- // TODO: (Cleanup) Does javascript have a clean zip?
- let result: any[] = [];
- for (let i = 0; i < this.combineFns.length; i++) {
- result.push(this.combineFns[i].extractOutput(accumulators[i]));
- }
- return result;
- }
+ extractOutput: (accumulators: any[]) => {
+ // TODO: (Cleanup) Does javascript have a clean zip?
+ let result: any[] = [];
+ for (let i = 0; i < combineFns.length; i++) {
+ result.push(combineFns[i].extractOutput(accumulators[i]));
+ }
+ return result;
+ },
+ };
}
// TODO: (Typescript) Can I type T as "something that has this key" and/or,
@@ -330,7 +329,5 @@ function extractFn<T, K>(extractor: string | string[] | ((T) => K)) {
import { requireForSerialization } from "../serialization";
requireForSerialization("apache_beam.transforms.pardo", exports);
requireForSerialization("apache_beam.transforms.pardo", {
- BinaryCombineFn: BinaryCombineFn,
GroupByAndCombine: GroupByAndCombine,
- MultiCombineFn: MultiCombineFn,
});
diff --git a/sdks/typescript/src/apache_beam/transforms/index.ts b/sdks/typescript/src/apache_beam/transforms/index.ts
index c659f87204f..f0d49a6f5c3 100644
--- a/sdks/typescript/src/apache_beam/transforms/index.ts
+++ b/sdks/typescript/src/apache_beam/transforms/index.ts
@@ -23,7 +23,7 @@ export * from "./group_and_combine";
export * from "./pardo";
export * from "./transform";
export * from "./window";
-export { Impulse } from "./internal";
+export { impulse, withRowCoder } from "./internal";
import { requireForSerialization } from "../serialization";
requireForSerialization("apache_beam.transforms", exports);
diff --git a/sdks/typescript/src/apache_beam/transforms/internal.ts b/sdks/typescript/src/apache_beam/transforms/internal.ts
index 27f03c836ab..1e257704cd1 100644
--- a/sdks/typescript/src/apache_beam/transforms/internal.ts
+++ b/sdks/typescript/src/apache_beam/transforms/internal.ts
@@ -19,75 +19,82 @@
import * as runnerApi from "../proto/beam_runner_api";
import * as urns from "../internal/urns";
-import { PTransform, withName } from "./transform";
+import {
+ PTransform,
+ PTransformClass,
+ withName,
+ extractName,
+} from "./transform";
import { PCollection, Root } from "../pvalue";
import { Pipeline } from "../internal/pipeline";
import { Coder } from "../coders/coders";
import { BytesCoder, KVCoder, IterableCoder } from "../coders/required_coders";
-import { ParDo } from "./pardo";
+import { parDo } from "./pardo";
import { GeneralObjectCoder } from "../coders/js_coders";
+import { RowCoder } from "../coders/row_coder";
import { KV } from "../values";
import { CombineFn } from "./group_and_combine";
/**
- * `Impulse` is the basic *source* primitive `PTransform`. It receives a Beam
+ * `Impulse` is the basic *source* primitive `PTransformClass`. It receives a Beam
* Root as input, and returns a `PCollection` of `Uint8Array` with a single
* element with length=0 (i.e. the empty byte array: `new Uint8Array("")`).
*
* `Impulse` is used to start the execution of a pipeline with a single element
* that can trigger execution of a source or SDF.
*/
-export class Impulse extends PTransform<Root, PCollection<Uint8Array>> {
- // static urn: string = runnerApi.StandardPTransforms_Primitives.IMPULSE.urn;
- // TODO: (Cleanup) use above line, not below line.
- static urn: string = "beam:transform:impulse:v1";
-
- constructor() {
- super("Impulse"); // TODO: (Unique names) pass null/nothing and get from reflection
- }
-
- expandInternal(
+export function impulse(): PTransform<Root, PCollection<Uint8Array>> {
+ function expandInternal(
input: Root,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
- ): PCollection<Uint8Array> {
+ ) {
transformProto.spec = runnerApi.FunctionSpec.create({
- urn: Impulse.urn,
+ urn: impulse.urn,
payload: urns.IMPULSE_BUFFER,
});
transformProto.environmentId = "";
return pipeline.createPCollectionInternal(new BytesCoder());
}
+
+ return withName("impulse", expandInternal);
}
+impulse.urn = "beam:transform:impulse:v1";
+
// TODO: (API) Should we offer a method on PCollection to do this?
-export class WithCoderInternal<T> extends PTransform<
- PCollection<T>,
- PCollection<T>
-> {
- constructor(private coder: Coder<T>) {
- super("WithCoderInternal(" + coder + ")");
- }
- expandInternal(
- input: PCollection<T>,
- pipeline: Pipeline,
- transformProto: runnerApi.PTransform
- ) {
- // IDENTITY rather than Flatten for better fusion.
- transformProto.spec = {
- urn: ParDo.urn,
- payload: runnerApi.ParDoPayload.toBinary(
- runnerApi.ParDoPayload.create({
- doFn: runnerApi.FunctionSpec.create({
- urn: urns.IDENTITY_DOFN_URN,
- payload: undefined!,
- }),
- })
- ),
- };
-
- return pipeline.createPCollectionInternal<T>(this.coder);
- }
+export function withCoderInternal<T>(
+ coder: Coder<T>
+): PTransform<PCollection<T>, PCollection<T>> {
+ return withName(
+ `withCoderInternal(${extractName(coder)})`,
+ (
+ input: PCollection<T>,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) => {
+ // IDENTITY rather than Flatten for better fusion.
+ transformProto.spec = {
+ urn: parDo.urn,
+ payload: runnerApi.ParDoPayload.toBinary(
+ runnerApi.ParDoPayload.create({
+ doFn: runnerApi.FunctionSpec.create({
+ urn: urns.IDENTITY_DOFN_URN,
+ payload: undefined!,
+ }),
+ })
+ ),
+ };
+
+ return pipeline.createPCollectionInternal<T>(coder);
+ }
+ );
+}
+
+export function withRowCoder<T extends Object>(
+ exemplar: T
+): PTransform<PCollection<T>, PCollection<T>> {
+ return withCoderInternal(RowCoder.fromJSON(exemplar));
}
/**
@@ -101,15 +108,11 @@ export class WithCoderInternal<T> extends PTransform<
* `GroupByKey` operations are used under the hood to execute combines,
* streaming triggers, stateful transforms, etc.
*/
-export class GroupByKey<K, V> extends PTransform<
+export function groupByKey<K, V>(): PTransform<
PCollection<KV<K, V>>,
PCollection<KV<K, Iterable<V>>>
> {
- // static urn: string = runnerApi.StandardPTransforms_Primitives.GROUP_BY_KEY.urn;
- // TODO: (Cleanup) use above line, not below line.
- static urn: string = "beam:transform:group_by_key:v1";
-
- expandInternal(
+ function expandInternal(
input: PCollection<KV<K, V>>,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
@@ -124,15 +127,15 @@ export class GroupByKey<K, V> extends PTransform<
if (inputCoderProto.spec!.urn != KVCoder.URN) {
return input
.apply(
- new WithCoderInternal(
+ withCoderInternal(
new KVCoder(new GeneralObjectCoder(), new GeneralObjectCoder())
)
)
- .apply(new GroupByKey());
+ .apply(groupByKey());
}
transformProto.spec = runnerApi.FunctionSpec.create({
- urn: GroupByKey.urn,
+ urn: groupByKey.urn,
payload: undefined!,
});
transformProto.environmentId = "";
@@ -144,8 +147,13 @@ export class GroupByKey<K, V> extends PTransform<
const outputCoder = new KVCoder(keyCoder, iterableValueCoder);
return pipeline.createPCollectionInternal(outputCoder);
}
+
+ return withName("groupByKey", expandInternal);
}
+// TODO: (Cleanup) runnerApi.StandardPTransformClasss_Primitives.GROUP_BY_KEY.urn.
+groupByKey.urn = "beam:transform:group_by_key:v1";
+
/**
* This transform is used to perform aggregations over groups of elements.
*
@@ -159,28 +167,24 @@ export class GroupByKey<K, V> extends PTransform<
* before a `GroupByKey` and after the `GroupByKey`. The partial aggregations
* help reduce the original data into a single aggregator per key per worker.
*/
-export class CombinePerKey<K, InputT, AccT, OutputT> extends PTransform<
- PCollection<KV<K, InputT>>,
- PCollection<KV<K, OutputT>>
-> {
- constructor(private combineFn: CombineFn<InputT, AccT, OutputT>) {
- super();
+export function combinePerKey<K, InputT, AccT, OutputT>(
+ combineFn: CombineFn<InputT, AccT, OutputT>
+): PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
+ function expandInternal(input: PCollection<KV<any, InputT>>) {
+ return input //
+ .apply(groupByKey())
+ .map(
+ withName("applyCombine", (kv) => ({
+ key: kv.key,
+ value: combineFn.extractOutput(
+ kv.value.reduce(
+ combineFn.addInput.bind(combineFn),
+ combineFn.createAccumulator()
+ )
+ ),
+ }))
+ );
}
- // Let the runner do the combiner lifting, when possible, handling timestamps,
- // windowing, and triggering as needed.
- expand(input: PCollection<KV<any, InputT>>) {
- const combineFn = this.combineFn;
- return input.apply(new GroupByKey()).map(
- withName("applyCombine", (kv) => ({
- key: kv.key,
- value: combineFn.extractOutput(
- kv.value.reduce(
- combineFn.addInput.bind(combineFn),
- combineFn.createAccumulator()
- )
- ),
- }))
- );
- }
+ return withName(`combinePerKey(${extractName(combineFn)})`, expandInternal);
}
diff --git a/sdks/typescript/src/apache_beam/transforms/pardo.ts b/sdks/typescript/src/apache_beam/transforms/pardo.ts
index d6c550299b9..e05c0cb14f5 100644
--- a/sdks/typescript/src/apache_beam/transforms/pardo.ts
+++ b/sdks/typescript/src/apache_beam/transforms/pardo.ts
@@ -23,7 +23,12 @@ import { GeneralObjectCoder } from "../coders/js_coders";
import { PCollection } from "../pvalue";
import { Pipeline } from "../internal/pipeline";
import { serializeFn } from "../internal/serialize";
-import { PTransform, extractName } from "./transform";
+import {
+ PTransform,
+ PTransformClass,
+ withName,
+ extractName,
+} from "./transform";
import { PaneInfo, Instant, Window, WindowedValue } from "../values";
export interface DoFn<InputT, OutputT, ContextT = undefined> {
@@ -40,46 +45,36 @@ export interface DoFn<InputT, OutputT, ContextT = undefined> {
// TODO: (API) Do we need an AsyncDoFn (and async[Flat]Map) to be able to call
// async functions in the body of the fns. Or can they always be Async?
// The latter seems to have perf issues.
-// (For PTransforms, it's a major usability issue, but maybe we can always
+// (For PTransformClasss, it's a major usability issue, but maybe we can always
// await when calling user code. OTOH, I don't know what the performance
// impact would be for creating promises for every element of every operation
// which is typically a very performance critical spot to optimize.)
-export class ParDo<InputT, OutputT, ContextT = undefined> extends PTransform<
- PCollection<InputT>,
- PCollection<OutputT>
-> {
- private doFn: DoFn<InputT, OutputT, ContextT>;
- private context: ContextT;
- // static urn: string = runnerApi.StandardPTransforms_Primitives.PAR_DO.urn;
- // TODO: (Cleanup) use above line, not below line.
- static urn: string = "beam:transform:pardo:v1";
- // TODO: (Typescript) Can the arg be optional iff ContextT is undefined?
- constructor(
- doFn: DoFn<InputT, OutputT, ContextT>,
- contextx: ContextT = undefined!
- ) {
- super(() => "ParDo(" + extractName(doFn) + ")");
- this.doFn = doFn;
- this.context = contextx;
- }
-
- expandInternal(
+// TODO: (Typescript) Can the context arg be optional iff ContextT is undefined?
+export function parDo<
+ InputT,
+ OutputT,
+ ContextT extends Object | undefined = undefined
+>(
+ doFn: DoFn<InputT, OutputT, ContextT>,
+ context: ContextT = undefined!
+): PTransform<PCollection<InputT>, PCollection<OutputT>> {
+ function expandInternal(
input: PCollection<InputT>,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
) {
// Extract and populate side inputs from the context.
- var context;
const sideInputs = {};
- if (typeof this.context == "object") {
- context = Object.create(this.context as Object);
+ var contextCopy;
+ if (typeof context == "object") {
+ contextCopy = Object.create(context as Object) as any;
const components = pipeline.context.components;
- for (const [name, value] of Object.entries(this.context)) {
+ for (const [name, value] of Object.entries(context)) {
if (value instanceof SideInputParam) {
const inputName = "side." + name;
transformProto.inputs[inputName] = value.pcoll.getId();
- context[name] = copySideInputWithId(value, inputName);
+ contextCopy[name] = copySideInputWithId(value, inputName);
const mainWindowingStrategyId =
components.pcollections[input.getId()].windowingStrategyId;
const sideWindowingStrategyId =
@@ -109,23 +104,23 @@ export class ParDo<InputT, OutputT, ContextT = undefined> extends PTransform<
},
};
} else {
- context[name] = value;
+ contextCopy[name] = value;
}
}
} else {
- context = this.context;
+ contextCopy = context;
}
// Now finally construct the proto.
transformProto.spec = runnerApi.FunctionSpec.create({
- urn: ParDo.urn,
+ urn: parDo.urn,
payload: runnerApi.ParDoPayload.toBinary(
runnerApi.ParDoPayload.create({
doFn: runnerApi.FunctionSpec.create({
urn: urns.SERIALIZED_JS_DOFN_INFO,
payload: serializeFn({
- doFn: this.doFn,
- context: context,
+ doFn: doFn,
+ context: contextCopy,
}),
}),
sideInputs: sideInputs,
@@ -140,89 +135,68 @@ export class ParDo<InputT, OutputT, ContextT = undefined> extends PTransform<
new GeneralObjectCoder()
);
}
+
+ return withName(`parDo(${extractName(doFn)})`, expandInternal);
}
+// TODO: (Cleanup) use runnerApi.StandardPTransformClasss_Primitives.PAR_DO.urn.
+parDo.urn = "beam:transform:pardo:v1";
+
+export type SplitOptions = {
+ knownTags?: string[];
+ unknownTagBehavior?: "error" | "ignore" | "rename" | undefined;
+ unknownTagName?: string;
+ exclusive?: boolean;
+};
+
+/**
+ * Splits a single PCollection of objects, with keys k, into an object of
+ * PCollections, with the same keys k, where each PCollection consists of the
+ * values associated with that key. That is,
+ *
+ * PCollection<{a: T, b: U, ...}> maps to {a: PCollection<T>, b: PCollection<U>, ...}
+ */
// TODO: (API) Consider as top-level method.
// TODO: Naming.
-// TODO: Allow default? Technically splitter can be implemented/wrapped to produce such.
-// TODO: Can we enforce splitter's output with the typing system to lie in targets?
-// TODO: (Optionally?) delete the switched-on field.
-// TODO: (API) Consider doing
-// [{a: aValue}, {g: bValue}, ...] => a: [aValue, ...], b: [bValue, ...]
-// instead of
-// [{key: 'a', aValueFields}, {key: 'b', bValueFields}, ...] =>
-// a: [{key: 'a', aValueFields}, ...], b: [{key: 'b', aValueFields}, ...],
-// (implemented below as Split2).
-export class Split<T> extends PTransform<
- PCollection<T>,
- { [key: string]: PCollection<T> }
-> {
- private tags: string[];
- constructor(private splitter: (T) => string, ...tags: string[]) {
- super("Split(" + tags + ")");
- this.tags = tags;
- }
- expandInternal(
+export function split<T extends { [key: string]: unknown }>(
+ tags: string[],
+ options: SplitOptions = {}
+): PTransform<PCollection<T>, { [P in keyof T]: PCollection<T[P]> }> {
+ function expandInternal(
input: PCollection<T>,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
) {
- transformProto.spec = runnerApi.FunctionSpec.create({
- urn: ParDo.urn,
- payload: runnerApi.ParDoPayload.toBinary(
- runnerApi.ParDoPayload.create({
- doFn: runnerApi.FunctionSpec.create({
- urn: urns.SPLITTING_JS_DOFN_URN,
- payload: serializeFn({ splitter: this.splitter }),
- }),
- })
- ),
- });
-
- const this_ = this;
- return Object.fromEntries(
- this_.tags.map((tag) => [
- tag,
- pipeline.createPCollectionInternal<T>(
- pipeline.context.getPCollectionCoderId(input)
- ),
- ])
- );
- }
-}
+ if (options.exclusive === undefined) {
+ options.exclusive = true;
+ }
+ if (options.unknownTagBehavior === undefined) {
+ options.unknownTagBehavior = "error";
+ }
+ if (
+ options.unknownTagBehavior == "rename" &&
+ !tags.includes(options.unknownTagName!)
+ ) {
+ tags.push(options.unknownTagName!);
+ }
+ if (options.knownTags === undefined) {
+ options.knownTags = tags;
+ }
-// TODO: (Typescript) Is it possible to type that this takes
-// PCollection<{a: T, b: U, ...}> to {a: PCollection<T>, b: PCollection<U>, ...}
-// Seems to requires a cast inside expandInternal. But at least the cast is contained there.
-export class Split2<T extends { [key: string]: unknown }> extends PTransform<
- PCollection<T>,
- { [P in keyof T]: PCollection<T[P]> }
-> {
- private tags: string[];
- constructor(...tags: string[]) {
- super("Split2(" + tags + ")");
- this.tags = tags;
- }
- expandInternal(
- input: PCollection<T>,
- pipeline: Pipeline,
- transformProto: runnerApi.PTransform
- ) {
transformProto.spec = runnerApi.FunctionSpec.create({
- urn: ParDo.urn,
+ urn: parDo.urn,
payload: runnerApi.ParDoPayload.toBinary(
runnerApi.ParDoPayload.create({
doFn: runnerApi.FunctionSpec.create({
- urn: urns.SPLITTING2_JS_DOFN_URN,
- payload: new Uint8Array(),
+ urn: urns.SPLITTING_JS_DOFN_URN,
+ payload: serializeFn(options),
}),
})
),
});
- const this_ = this;
return Object.fromEntries(
- this_.tags.map((tag) => [
+ tags.map((tag) => [
tag,
pipeline.createPCollectionInternal<T[typeof tag]>(
pipeline.context.getPCollectionCoderId(input)
@@ -230,6 +204,8 @@ export class Split2<T extends { [key: string]: unknown }> extends PTransform<
])
) as { [P in keyof T]: PCollection<T[P]> };
}
+
+ return withName(`Split(${tags})`, expandInternal);
}
/*
@@ -238,15 +214,11 @@ export class Split2<T extends { [key: string]: unknown }> extends PTransform<
* special `lookup` method to retrieve the relevant value associated with the
* currently-being-processed element.
*/
-export abstract class ParDoParam<T> {
- readonly parDoParamName: string;
-
+export class ParDoParam<T> {
// Provided externally.
private provider: ParamProvider | undefined;
- constructor(parDoParamName: string) {
- this.parDoParamName = parDoParamName;
- }
+ constructor(readonly parDoParamName: string) {}
// TODO: Nameing "get" seems to be special.
lookup(): T {
@@ -266,23 +238,17 @@ export interface ParamProvider {
provide<T>(param: ParDoParam<T>): T;
}
-export class WindowParam extends ParDoParam<Window> {
- constructor() {
- super("window");
- }
+export function windowParam(): ParDoParam<Window> {
+ return new ParDoParam<Window>("window");
}
-export class TimestampParam extends ParDoParam<Instant> {
- constructor() {
- super("timestamp");
- }
+export function timestampParam(): ParDoParam<Instant> {
+ return new ParDoParam<Instant>("timestamp");
}
// TODO: Naming. Should this be PaneParam?
-export class PaneInfoParam extends ParDoParam<PaneInfo> {
- constructor() {
- super("paneinfo");
- }
+export function paneInfoParam(): ParDoParam<PaneInfo> {
+ return new ParDoParam<PaneInfo>("paneinfo");
}
interface SideInputAccessor<PCollT, AccessorT, ValueT> {
@@ -330,35 +296,32 @@ function copySideInputWithId<PCollT, AccessorT, ValueT>(
return copy;
}
-export class IterableSideInput<T> extends SideInputParam<
- T,
- Iterable<T>,
- Iterable<T>
-> {
- constructor(pcoll: PCollection<T>) {
- super(pcoll, {
- accessPattern: "beam:side_input:iterable:v1",
- toValue: (iter: Iterable<T>) => iter,
- });
- }
+export function iterableSideInput<T>(
+ pcoll: PCollection<T>
+): SideInputParam<T, Iterable<T>, Iterable<T>> {
+ return new SideInputParam<T, Iterable<T>, Iterable<T>>(pcoll, {
+ accessPattern: "beam:side_input:iterable:v1",
+ toValue: (iter: Iterable<T>) => iter,
+ });
}
-export class SingletonSideInput<T> extends SideInputParam<T, Iterable<T>, T> {
- constructor(pcoll: PCollection<T>, defaultValue: T | undefined = undefined) {
- super(pcoll, {
- accessPattern: "beam:side_input:iterable:v1",
- toValue: (iter: Iterable<T>) => {
- const asArray = Array.from(iter);
- if (asArray.length == 0 && defaultValue != undefined) {
- return defaultValue;
- } else if (asArray.length == 1) {
- return asArray[0];
- } else {
- throw new Error("Expected a single element, got " + asArray.length);
- }
- },
- });
- }
+export function singletonSideInput<T>(
+ pcoll: PCollection<T>,
+ defaultValue: T | undefined = undefined
+): SideInputParam<T, Iterable<T>, T> {
+ return new SideInputParam<T, Iterable<T>, T>(pcoll, {
+ accessPattern: "beam:side_input:iterable:v1",
+ toValue: (iter: Iterable<T>) => {
+ const asArray = Array.from(iter);
+ if (asArray.length == 0 && defaultValue != undefined) {
+ return defaultValue;
+ } else if (asArray.length == 1) {
+ return asArray[0];
+ } else {
+ throw new Error("Expected a single element, got " + asArray.length);
+ }
+ },
+ });
}
// TODO: (Extension) Map side inputs.
diff --git a/sdks/typescript/src/apache_beam/transforms/sql.ts b/sdks/typescript/src/apache_beam/transforms/sql.ts
index 33a92423c58..417a277609c 100644
--- a/sdks/typescript/src/apache_beam/transforms/sql.ts
+++ b/sdks/typescript/src/apache_beam/transforms/sql.ts
@@ -31,25 +31,23 @@ import { serviceProviderFromJavaGradleTarget } from "../utils/service";
* corresponding names can be used in the sql statement.
*
* The input(s) must be schema'd (i.e. use the RowCoder). This can be done
- * by explicitly setting the schema with internal.WithCoderInternal or passing
+ * by explicitly setting the schema with external.withRowCoder or passing
* a prototype element in as a second argument, e.g.
*
* pcoll.applyAsync(
- * new SqlTransform(
+ * sqlTransform(
* "select a, b from PCOLLECTION",
* {a: 0, b: "string"},
* ));
*/
-export class SqlTransform<
+export function sqlTransform<
InputT extends PCollection<any> | { [key: string]: PCollection<any> }
-> extends transform.AsyncPTransform<InputT, PCollection<any>> {
+>(
+ query: string,
+ inputTypes = null
+): transform.AsyncPTransform<InputT, PCollection<any>> {
// TOOD: (API) (Typescript): How to infer input_types, or at least make it optional.
- constructor(private query: string, private inputTypes = null) {
- // TODO: Unique names. Should we truncate/omit the full SQL statement?
- super("Sql(" + query + ")");
- }
-
- async asyncExpand(input: InputT): Promise<PCollection<any>> {
+ async function expandInternal(input: InputT): Promise<PCollection<any>> {
function withCoder<T>(pcoll: PCollection<T>, type): PCollection<T> {
if (type == null) {
if (
@@ -67,33 +65,30 @@ export class SqlTransform<
}
return pcoll;
}
- return pcoll.apply(
- new internal.WithCoderInternal(row_coder.RowCoder.fromJSON(type))
- );
+ return pcoll.apply(internal.withRowCoder(type));
}
if (input instanceof PCollection) {
- input = withCoder(input, this.inputTypes) as InputT;
+ input = withCoder(input, inputTypes) as InputT;
} else {
input = Object.fromEntries(
Object.keys(input).map((tag) => [
tag,
- withCoder(
- input[tag],
- this.inputTypes == null ? null : this.inputTypes[tag]
- ),
+ withCoder(input[tag], inputTypes == null ? null : inputTypes[tag]),
])
) as InputT;
}
return await P(input).asyncApply(
- new external.RawExternalTransform(
+ external.rawExternalTransform(
"beam:external:java:sql:v1",
- { query: this.query },
+ { query: query },
serviceProviderFromJavaGradleTarget(
"sdks:java:extensions:sql:expansion-service:shadowJar"
)
)
);
}
+
+ return transform.withName(`sqlTransform(${query})`, expandInternal);
}
diff --git a/sdks/typescript/src/apache_beam/transforms/transform.ts b/sdks/typescript/src/apache_beam/transforms/transform.ts
index 4d7fcfcdc02..55b1da53a75 100644
--- a/sdks/typescript/src/apache_beam/transforms/transform.ts
+++ b/sdks/typescript/src/apache_beam/transforms/transform.ts
@@ -63,7 +63,7 @@ export function extractName<T>(withName: T): string {
// call rather than forcing the asynchronous nature all the way up the call
// hierarchy).
-export class AsyncPTransform<
+export class AsyncPTransformClass<
InputT extends PValue<any>,
OutputT extends PValue<any>
> {
@@ -86,10 +86,10 @@ export class AsyncPTransform<
}
}
-export class PTransform<
+export class PTransformClass<
InputT extends PValue<any>,
OutputT extends PValue<any>
-> extends AsyncPTransform<InputT, OutputT> {
+> extends AsyncPTransformClass<InputT, OutputT> {
expand(input: InputT): OutputT {
throw new Error("Method expand has not been implemented.");
}
@@ -114,3 +114,27 @@ export class PTransform<
return this.expandInternal(input, pipeline, transformProto);
}
}
+
+export type AsyncPTransform<
+ InputT extends PValue<any>,
+ OutputT extends PValue<any>
+> =
+ | AsyncPTransformClass<InputT, OutputT>
+ | ((input: InputT) => Promise<OutputT>)
+ | ((
+ input: InputT,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) => Promise<OutputT>);
+
+export type PTransform<
+ InputT extends PValue<any>,
+ OutputT extends PValue<any>
+> =
+ | PTransformClass<InputT, OutputT>
+ | ((input: InputT) => OutputT)
+ | ((
+ input: InputT,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) => OutputT);
diff --git a/sdks/typescript/src/apache_beam/transforms/window.ts b/sdks/typescript/src/apache_beam/transforms/window.ts
index 0d66bcbb949..57a335ba580 100644
--- a/sdks/typescript/src/apache_beam/transforms/window.ts
+++ b/sdks/typescript/src/apache_beam/transforms/window.ts
@@ -19,12 +19,12 @@
import * as runnerApi from "../proto/beam_runner_api";
import * as urns from "../internal/urns";
-import { PTransform } from "./transform";
+import { PTransform, withName, extractName } from "./transform";
import { Coder } from "../coders/coders";
import { Window } from "../values";
import { PCollection } from "../pvalue";
import { Pipeline } from "../internal/pipeline";
-import { ParDo } from "./pardo";
+import { parDo } from "./pardo";
import { serializeFn } from "../internal/serialize";
export interface WindowFn<W extends Window> {
@@ -35,108 +35,96 @@ export interface WindowFn<W extends Window> {
assignsToOneWindow: () => boolean;
}
-export class WindowInto<T, W extends Window> extends PTransform<
- PCollection<T>,
- PCollection<T>
-> {
- static createWindowingStrategy(
- pipeline: Pipeline,
- windowFn: WindowFn<any>,
- windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
- ): runnerApi.WindowingStrategy {
- let result: runnerApi.WindowingStrategy;
- if (windowingStrategyBase == undefined) {
- result = {
- windowFn: undefined!,
- windowCoderId: undefined!,
- mergeStatus: undefined!,
- assignsToOneWindow: undefined!,
- trigger: { trigger: { oneofKind: "default", default: {} } },
- accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
- outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
- closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
- onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
- allowedLateness: BigInt(0),
- environmentId: pipeline.defaultEnvironment,
- };
- } else {
- result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
- }
- result.windowFn = windowFn.toProto();
- result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
- result.mergeStatus = windowFn.isMerging()
- ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
- : runnerApi.MergeStatus_Enum.NON_MERGING;
- result.assignsToOneWindow = windowFn.assignsToOneWindow();
- return result;
- }
-
- constructor(
- private windowFn: WindowFn<W>,
- private windowingStrategyBase:
- | runnerApi.WindowingStrategy
- | undefined = undefined
- ) {
- super("WindowInto(" + windowFn + ", " + windowingStrategyBase + ")");
+export function createWindowingStrategyProto(
+ pipeline: Pipeline,
+ windowFn: WindowFn<any>,
+ windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+): runnerApi.WindowingStrategy {
+ let result: runnerApi.WindowingStrategy;
+ if (windowingStrategyBase == undefined) {
+ result = {
+ windowFn: undefined!,
+ windowCoderId: undefined!,
+ mergeStatus: undefined!,
+ assignsToOneWindow: undefined!,
+ trigger: { trigger: { oneofKind: "default", default: {} } },
+ accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
+ outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
+ closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
+ onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
+ allowedLateness: BigInt(0),
+ environmentId: pipeline.defaultEnvironment,
+ };
+ } else {
+ result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
}
+ result.windowFn = windowFn.toProto();
+ result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
+ result.mergeStatus = windowFn.isMerging()
+ ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
+ : runnerApi.MergeStatus_Enum.NON_MERGING;
+ result.assignsToOneWindow = windowFn.assignsToOneWindow();
+ return result;
+}
- expandInternal(
- input: PCollection<T>,
- pipeline: Pipeline,
- transformProto: runnerApi.PTransform
- ) {
- transformProto.spec = runnerApi.FunctionSpec.create({
- urn: ParDo.urn,
- payload: runnerApi.ParDoPayload.toBinary(
- runnerApi.ParDoPayload.create({
- doFn: runnerApi.FunctionSpec.create({
- urn: urns.JS_WINDOW_INTO_DOFN_URN,
- payload: serializeFn({ windowFn: this.windowFn }),
- }),
- })
- ),
- });
+export function windowInto<T, W extends Window>(
+ windowFn: WindowFn<W>,
+ windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+): PTransform<PCollection<T>, PCollection<T>> {
+ return withName(
+ `WindowInto(${extractName(windowFn)}, ${windowingStrategyBase})`,
+ (
+ input: PCollection<T>,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) => {
+ transformProto.spec = runnerApi.FunctionSpec.create({
+ urn: parDo.urn,
+ payload: runnerApi.ParDoPayload.toBinary(
+ runnerApi.ParDoPayload.create({
+ doFn: runnerApi.FunctionSpec.create({
+ urn: urns.JS_WINDOW_INTO_DOFN_URN,
+ payload: serializeFn({ windowFn: windowFn }),
+ }),
+ })
+ ),
+ });
- const inputCoder = pipeline.context.getPCollectionCoderId(input);
- return pipeline.createPCollectionInternal<T>(
- inputCoder,
- WindowInto.createWindowingStrategy(
- pipeline,
- this.windowFn,
- this.windowingStrategyBase
- )
- );
- }
+ const inputCoder = pipeline.context.getPCollectionCoderId(input);
+ return pipeline.createPCollectionInternal<T>(
+ inputCoder,
+ createWindowingStrategyProto(pipeline, windowFn, windowingStrategyBase)
+ );
+ }
+ );
}
// TODO: (Cleanup) Add restrictions on moving backwards?
-export class AssignTimestamps<T> extends PTransform<
- PCollection<T>,
- PCollection<T>
-> {
- constructor(private func: (T, Instant) => typeof Instant) {
- super();
- }
-
- expandInternal(
- input: PCollection<T>,
- pipeline: Pipeline,
- transformProto: runnerApi.PTransform
- ) {
- transformProto.spec = runnerApi.FunctionSpec.create({
- urn: ParDo.urn,
- payload: runnerApi.ParDoPayload.toBinary(
- runnerApi.ParDoPayload.create({
- doFn: runnerApi.FunctionSpec.create({
- urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
- payload: serializeFn({ func: this.func }),
- }),
- })
- ),
- });
+export function assignTimestamps<T>(
+ timestampFn: (T, Instant) => typeof Instant
+): PTransform<PCollection<T>, PCollection<T>> {
+ return withName(
+ `assignTimestamp(${extractName(timestampFn)})`,
+ (
+ input: PCollection<T>,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) => {
+ transformProto.spec = runnerApi.FunctionSpec.create({
+ urn: parDo.urn,
+ payload: runnerApi.ParDoPayload.toBinary(
+ runnerApi.ParDoPayload.create({
+ doFn: runnerApi.FunctionSpec.create({
+ urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
+ payload: serializeFn({ func: timestampFn }),
+ }),
+ })
+ ),
+ });
- return pipeline.createPCollectionInternal<T>(
- pipeline.context.getPCollectionCoderId(input)
- );
- }
+ return pipeline.createPCollectionInternal<T>(
+ pipeline.context.getPCollectionCoderId(input)
+ );
+ }
+ );
}
diff --git a/sdks/typescript/src/apache_beam/transforms/windowings.ts b/sdks/typescript/src/apache_beam/transforms/windowings.ts
index 2350e417e04..89c4941ed07 100644
--- a/sdks/typescript/src/apache_beam/transforms/windowings.ts
+++ b/sdks/typescript/src/apache_beam/transforms/windowings.ts
@@ -30,106 +30,70 @@ import {
} from "../coders/standard_coders";
import { GlobalWindow, Instant, IntervalWindow } from "../values";
-export class GlobalWindows implements WindowFn<GlobalWindow> {
- assignWindows(Instant) {
- return [new GlobalWindow()];
- }
- windowCoder() {
- return new GlobalWindowCoder();
- }
- toProto() {
- return {
+export function globalWindows(): WindowFn<GlobalWindow> {
+ return {
+ assignWindows: (Instant) => [new GlobalWindow()],
+ windowCoder: () => new GlobalWindowCoder(),
+ isMerging: () => false,
+ assignsToOneWindow: () => true,
+ toProto: () => ({
urn: "beam:window_fn:global_windows:v1",
payload: new Uint8Array(),
- };
- }
- isMerging() {
- return false;
- }
- assignsToOneWindow() {
- return true;
- }
+ }),
+ };
}
-export class FixedWindows implements WindowFn<IntervalWindow> {
- size: Long;
- offset: Instant; // TODO: (Cleanup) Or should this be a long as well?
-
+export function fixedWindows(
+ sizeSeconds: number | Long,
+ offsetSeconds: Instant = Long.fromValue(0)
+): WindowFn<IntervalWindow> {
// TODO: (Cleanup) Use a time library?
- constructor(
- sizeSeconds: number | Long,
- offsetSeconds: Instant = Long.fromValue(0)
- ) {
- if (typeof sizeSeconds == "number") {
- this.size = Long.fromValue(sizeSeconds).mul(1000);
- } else {
- this.size = sizeSeconds.mul(1000);
- }
- this.offset = offsetSeconds.mul(1000);
- }
+ const sizeMillis = secsToMillisLong(sizeSeconds);
+ const offsetMillis = secsToMillisLong(offsetSeconds);
- assignWindows(t: Instant) {
- const start = t.sub(t.sub(this.offset).mod(this.size));
- return [new IntervalWindow(start, start.add(this.size))];
- }
+ return {
+ assignWindows: (t: Instant) => {
+ const start = t.sub(t.sub(offsetMillis).mod(sizeMillis));
+ return [new IntervalWindow(start, start.add(sizeMillis))];
+ },
- windowCoder() {
- return new IntervalWindowCoder();
- }
+ windowCoder: () => new IntervalWindowCoder(),
+ isMerging: () => false,
+ assignsToOneWindow: () => true,
- toProto() {
- return {
+ toProto: () => ({
urn: "beam:window_fn:fixed_windows:v1",
payload: FixedWindowsPayload.toBinary({
- size: millisToProto(this.size),
- offset: millisToProto(this.offset),
+ size: millisToProto(sizeMillis),
+ offset: millisToProto(offsetMillis),
}),
- };
- }
-
- isMerging() {
- return false;
- }
-
- assignsToOneWindow() {
- return true;
- }
+ }),
+ };
}
-export class Sessions implements WindowFn<IntervalWindow> {
- gap: Long;
-
- constructor(gapSeconds: number | Long) {
- if (typeof gapSeconds == "number") {
- this.gap = Long.fromValue(gapSeconds).mul(1000);
- } else {
- this.gap = gapSeconds.mul(1000);
- }
- }
+export function sessions(gapSeconds: number | Long): WindowFn<IntervalWindow> {
+ const gapMillis = secsToMillisLong(gapSeconds);
- assignWindows(t: Instant) {
- return [new IntervalWindow(t, t.add(this.gap))];
- }
+ return {
+ assignWindows: (t: Instant) => [new IntervalWindow(t, t.add(gapMillis))],
+ windowCoder: () => new IntervalWindowCoder(),
+ isMerging: () => true,
+ assignsToOneWindow: () => true,
- windowCoder() {
- return new IntervalWindowCoder();
- }
-
- toProto() {
- return {
+ toProto: () => ({
urn: "beam:window_fn:session_windows:v1",
payload: SessionWindowsPayload.toBinary({
- gapSize: millisToProto(this.gap),
+ gapSize: millisToProto(gapMillis),
}),
- };
- }
-
- isMerging() {
- return true;
- }
+ }),
+ };
+}
- assignsToOneWindow() {
- return true;
+function secsToMillisLong(secs: number | Long): Long {
+ if (typeof secs == "number") {
+ return Long.fromValue(secs * 1000);
+ } else {
+ return secs.mul(1000);
}
}
@@ -139,3 +103,12 @@ function millisToProto(t: Long) {
import { requireForSerialization } from "../serialization";
requireForSerialization("apache_beam.transforms.windowings", exports);
+requireForSerialization("apache_beam.transforms.windowings", millisToProto);
+requireForSerialization(
+ "apache_beam.transforms.windowings",
+ FixedWindowsPayload
+);
+requireForSerialization(
+ "apache_beam.transforms.windowings",
+ SessionWindowsPayload
+);
diff --git a/sdks/typescript/src/apache_beam/worker/operators.ts b/sdks/typescript/src/apache_beam/worker/operators.ts
index f7bfa71038b..366e794c0d6 100644
--- a/sdks/typescript/src/apache_beam/worker/operators.ts
+++ b/sdks/typescript/src/apache_beam/worker/operators.ts
@@ -30,7 +30,7 @@ import { PipelineContext } from "../internal/pipeline";
import { deserializeFn } from "../internal/serialize";
import { Coder, Context as CoderContext } from "../coders/coders";
import { Window, Instant, PaneInfo, WindowedValue } from "../values";
-import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { parDo, DoFn, ParDoParam, SplitOptions } from "../transforms/pardo";
import { WindowFn } from "../transforms/window";
import {
@@ -445,42 +445,37 @@ class IdentityParDoOperator implements IOperator {
class SplittingDoFnOperator implements IOperator {
constructor(
- private splitter: (any) => string,
- private receivers: { [key: string]: Receiver }
+ private receivers: { [key: string]: Receiver },
+ private options: SplitOptions
) {}
async startBundle() {}
process(wvalue: WindowedValue<unknown>) {
- const tag = this.splitter(wvalue.value);
- const receiver = this.receivers[tag];
- if (receiver) {
- return receiver.receive(wvalue);
- } else {
- // TODO: (API) Make this configurable.
+ const result = new ProcessResultBuilder();
+ const keys = Object.keys(wvalue.value as object);
+ if (this.options.exclusive && keys.length != 1) {
throw new Error(
- "Unexpected tag '" +
- tag +
- "' for " +
- wvalue.value +
- " not in " +
- [...Object.keys(this.receivers)]
+ "Multiple keys for exclusively split element: " + wvalue.value
);
}
- }
-
- async finishBundle() {}
-}
-
-class Splitting2DoFnOperator implements IOperator {
- constructor(private receivers: { [key: string]: Receiver }) {}
-
- async startBundle() {}
-
- process(wvalue: WindowedValue<unknown>) {
- const result = new ProcessResultBuilder();
- // TODO: (API) Should I exactly one instead of allowing a union?
- for (const tag of Object.keys(wvalue.value as object)) {
+ for (let tag of keys) {
+ if (!this.options.knownTags!.includes(tag)) {
+ if (this.options.unknownTagBehavior == "rename") {
+ tag = this.options.unknownTagName!;
+ } else if (this.options.unknownTagBehavior == "ignore") {
+ continue;
+ } else {
+ throw new Error(
+ "Unexpected tag '" +
+ tag +
+ "' for " +
+ wvalue.value +
+ " not in " +
+ this.options.knownTags
+ );
+ }
+ }
const receiver = this.receivers[tag];
if (receiver) {
result.add(
@@ -491,16 +486,6 @@ class Splitting2DoFnOperator implements IOperator {
pane: wvalue.pane,
})
);
- } else {
- // TODO: (API) Make this configurable.
- throw new Error(
- "Unexpected tag '" +
- tag +
- "' for " +
- wvalue.value +
- " not in " +
- [...Object.keys(this.receivers)]
- );
}
}
return result.build();
@@ -558,7 +543,7 @@ class AssignTimestampsParDoOperator implements IOperator {
}
registerOperatorConstructor(
- ParDo.urn,
+ parDo.urn,
(transformId: string, transform: PTransform, context: OperatorContext) => {
const receiver = context.getReceiver(
onlyElement(Object.values(transform.outputs))
@@ -590,22 +575,13 @@ registerOperatorConstructor(
);
} else if (spec.doFn?.urn == urns.SPLITTING_JS_DOFN_URN) {
return new SplittingDoFnOperator(
- deserializeFn(spec.doFn.payload!).splitter,
- Object.fromEntries(
- Object.entries(transform.outputs).map(([tag, pcId]) => [
- tag,
- context.getReceiver(pcId),
- ])
- )
- );
- } else if (spec.doFn?.urn == urns.SPLITTING2_JS_DOFN_URN) {
- return new Splitting2DoFnOperator(
Object.fromEntries(
Object.entries(transform.outputs).map(([tag, pcId]) => [
tag,
context.getReceiver(pcId),
])
- )
+ ),
+ deserializeFn(spec.doFn.payload!)
);
} else {
throw new Error("Unknown DoFn type: " + spec);
diff --git a/sdks/typescript/test/combine_test.ts b/sdks/typescript/test/combine_test.ts
index c98e420ee17..f54f0a90e80 100644
--- a/sdks/typescript/test/combine_test.ts
+++ b/sdks/typescript/test/combine_test.ts
@@ -25,17 +25,17 @@ import { PortableRunner } from "../src/apache_beam/runners/portable_runner/runne
import * as combiners from "../src/apache_beam/transforms/combiners";
import {
CombineFn,
- GroupBy,
- GroupGlobally,
- CountPerElement,
- CountGlobally,
+ groupBy,
+ groupGlobally,
+ countPerElement,
+ countGlobally,
} from "../src/apache_beam/transforms/group_and_combine";
describe("Apache Beam combiners", function () {
it("runs wordcount with a countPerKey transform and asserts the result", async function () {
await new DirectRunner().run((root) => {
const lines = root.apply(
- new beam.Create([
+ beam.create([
"In the beginning God created the heaven and the earth.",
"And the earth was without form, and void; and darkness was upon the face of the deep.",
"And the Spirit of God moved upon the face of the waters.",
@@ -49,9 +49,9 @@ describe("Apache Beam combiners", function () {
yield* line.split(/[^a-z]+/);
})
.map((elm) => ({ key: elm, value: 1 }))
- .apply(new GroupBy("key").combining("value", combiners.sum, "value"))
+ .apply(groupBy("key").combining("value", combiners.sum, "value"))
.apply(
- new testing.AssertDeepEqual([
+ testing.assertDeepEqual([
{ key: "in", value: 1 },
{ key: "the", value: 9 },
{ key: "beginning", value: 1 },
@@ -86,9 +86,7 @@ describe("Apache Beam combiners", function () {
it("runs wordcount with a countGlobally transform and asserts the result", async function () {
await new DirectRunner().run((root) => {
const lines = root.apply(
- new beam.Create([
- "And God said, Let there be light: and there was light",
- ])
+ beam.create(["And God said, Let there be light: and there was light"])
);
lines
@@ -96,8 +94,8 @@ describe("Apache Beam combiners", function () {
.flatMap(function* splitWords(line: string) {
yield* line.split(/[^a-z]+/);
})
- .apply(new CountGlobally())
- .apply(new testing.AssertDeepEqual([11]));
+ .apply(countGlobally())
+ .apply(testing.assertDeepEqual([11]));
});
});
@@ -138,7 +136,7 @@ describe("Apache Beam combiners", function () {
await new DirectRunner().run((root) => {
const lines = root.apply(
- new beam.Create([
+ beam.create([
"In the beginning God created the heaven and the earth.",
"And the earth was without form, and void; and darkness was upon the face of the deep.",
"And the Spirit of God moved upon the face of the waters.",
@@ -153,12 +151,12 @@ describe("Apache Beam combiners", function () {
})
.map((word) => word.length)
.apply(
- new GroupGlobally()
+ groupGlobally()
.combining((c) => c, combiners.mean, "mean")
.combining((c) => c, unstableStdDevCombineFn(), "stdDev")
)
.apply(
- new testing.AssertDeepEqual([
+ testing.assertDeepEqual([
{ mean: 3.611111111111111, stdDev: 3.2746913580246897 },
])
);
@@ -168,7 +166,7 @@ describe("Apache Beam combiners", function () {
it("test GroupBy with combining", async function () {
await new DirectRunner().run((root) => {
const inputs = root.apply(
- new beam.Create([
+ beam.create([
{ k: "k1", a: 1, b: 100 },
{ k: "k1", a: 2, b: 200 },
{ k: "k2", a: 9, b: 1000 },
@@ -177,13 +175,13 @@ describe("Apache Beam combiners", function () {
inputs
.apply(
- new GroupBy("k")
+ groupBy("k")
.combining("a", combiners.max, "aMax")
.combining("a", combiners.sum, "aSum")
.combining("b", combiners.mean, "mean")
)
.apply(
- new testing.AssertDeepEqual([
+ testing.assertDeepEqual([
{ k: "k1", aMax: 2, aSum: 3, mean: 150 },
{ k: "k2", aMax: 9, aSum: 9, mean: 1000 },
])
@@ -194,7 +192,7 @@ describe("Apache Beam combiners", function () {
it("test GroupBy list with combining", async function () {
await new DirectRunner().run((root) => {
const inputs = root.apply(
- new beam.Create([
+ beam.create([
{ a: 1, b: 10, c: 100 },
{ a: 2, b: 10, c: 100 },
{ a: 1, b: 10, c: 400 },
@@ -202,18 +200,18 @@ describe("Apache Beam combiners", function () {
);
inputs
- .apply(new GroupBy(["a", "b"]).combining("c", combiners.sum, "sum"))
+ .apply(groupBy(["a", "b"]).combining("c", combiners.sum, "sum"))
.apply(
- new testing.AssertDeepEqual([
+ testing.assertDeepEqual([
{ a: 1, b: 10, sum: 500 },
{ a: 2, b: 10, sum: 100 },
])
);
inputs
- .apply(new GroupBy(["b", "c"]).combining("a", combiners.sum, "sum"))
+ .apply(groupBy(["b", "c"]).combining("a", combiners.sum, "sum"))
.apply(
- new testing.AssertDeepEqual([
+ testing.assertDeepEqual([
{ b: 10, c: 100, sum: 3 },
{ b: 10, c: 400, sum: 1 },
])
@@ -224,7 +222,7 @@ describe("Apache Beam combiners", function () {
it("test GroupBy expr with combining", async function () {
await new DirectRunner().run((root) => {
const inputs = root.apply(
- new beam.Create([
+ beam.create([
{ a: 1, b: 10 },
{ a: 0, b: 20 },
{ a: -1, b: 30 },
@@ -233,14 +231,14 @@ describe("Apache Beam combiners", function () {
inputs
.apply(
- new GroupBy((element: any) => element.a * element.a).combining(
+ groupBy((element: any) => element.a * element.a).combining(
"b",
combiners.sum,
"sum"
)
)
.apply(
- new testing.AssertDeepEqual([
+ testing.assertDeepEqual([
{ key: 1, sum: 40 },
{ key: 0, sum: 20 },
])
@@ -251,7 +249,7 @@ describe("Apache Beam combiners", function () {
it("test GroupBy with binary combinefn", async function () {
await new DirectRunner().run((root) => {
const inputs = root.apply(
- new beam.Create([
+ beam.create([
{ key: 0, value: 10 },
{ key: 1, value: 20 },
{ key: 0, value: 30 },
@@ -260,12 +258,12 @@ describe("Apache Beam combiners", function () {
inputs
.apply(
- new GroupBy("key")
+ groupBy("key")
.combining("value", (x, y) => x + y, "sum")
.combining("value", (x, y) => Math.max(x, y), "max")
)
.apply(
- new testing.AssertDeepEqual([
+ testing.assertDeepEqual([
{ key: 0, sum: 40, max: 30 },
{ key: 1, sum: 20, max: 20 },
])
diff --git a/sdks/typescript/test/primitives_test.ts b/sdks/typescript/test/primitives_test.ts
index 0840ca7c93c..c95c2a2633e 100644
--- a/sdks/typescript/test/primitives_test.ts
+++ b/sdks/typescript/test/primitives_test.ts
@@ -25,10 +25,6 @@ import {
IterableCoder,
KVCoder,
} from "../src/apache_beam/coders/standard_coders";
-import {
- GroupBy,
- GroupGlobally,
-} from "../src/apache_beam/transforms/group_and_combine";
import * as combiners from "../src/apache_beam/transforms/combiners";
import { GeneralObjectCoder } from "../src/apache_beam/coders/js_coders";
@@ -45,84 +41,74 @@ describe("primitives module", function () {
it("runs a map", async function () {
await new DirectRunner().run((root) => {
const pcolls = root
- .apply(new beam.Create([1, 2, 3]))
+ .apply(beam.create([1, 2, 3]))
.map((x) => x * x)
- .apply(new testing.AssertDeepEqual([1, 4, 9]));
+ .apply(testing.assertDeepEqual([1, 4, 9]));
});
});
it("runs a flatmap", async function () {
await new DirectRunner().run((root) => {
const pcolls = root
- .apply(new beam.Create(["a b", "c"]))
+ .apply(beam.create(["a b", "c"]))
.flatMap((s) => s.split(/ +/))
- .apply(new testing.AssertDeepEqual(["a", "b", "c"]));
+ .apply(testing.assertDeepEqual(["a", "b", "c"]));
});
});
it("runs a Splitter", async function () {
await new DirectRunner().run((root) => {
const pcolls = root
- .apply(new beam.Create(["apple", "apricot", "banana"]))
- .apply(new beam.Split((e) => e[0], "a", "b"));
- pcolls.a.apply(new testing.AssertDeepEqual(["apple", "apricot"]));
- pcolls.b.apply(new testing.AssertDeepEqual(["banana"]));
- });
- });
-
- it("runs a Splitter2", async function () {
- await new DirectRunner().run((root) => {
- const pcolls = root
- .apply(new beam.Create([{ a: 1 }, { b: 10 }, { a: 2, b: 20 }]))
- .apply(new beam.Split2("a", "b"));
- pcolls.a.apply(new testing.AssertDeepEqual([1, 2]));
- pcolls.b.apply(new testing.AssertDeepEqual([10, 20]));
+ .apply(beam.create([{ a: 1 }, { b: 10 }, { a: 2, b: 20 }]))
+ .apply(beam.split(["a", "b"], { exclusive: false }));
+ pcolls.a.apply(testing.assertDeepEqual([1, 2]));
+ pcolls.b.apply(testing.assertDeepEqual([10, 20]));
});
});
it("runs a map with context", async function () {
await new DirectRunner().run((root) => {
root
- .apply(new beam.Create([1, 2, 3]))
+ .apply(beam.create([1, 2, 3]))
.map((a: number, b: number) => a + b, 100)
- .apply(new testing.AssertDeepEqual([101, 102, 103]));
+ .apply(testing.assertDeepEqual([101, 102, 103]));
});
});
it("runs a map with singleton side input", async function () {
await new DirectRunner().run((root) => {
- const input = root.apply(new beam.Create([1, 2, 1]));
- const sideInput = root.apply(new beam.Create([4]));
+ const input = root.apply(beam.create([1, 2, 1]));
+ const sideInput = root.apply(beam.create([4]));
input
.map((e, context) => e / context.side.lookup(), {
- side: new pardo.SingletonSideInput(sideInput),
+ side: pardo.singletonSideInput(sideInput),
})
- .apply(new testing.AssertDeepEqual([0.25, 0.5, 0.25]));
+ .apply(testing.assertDeepEqual([0.25, 0.5, 0.25]));
});
});
it("runs a map with a side input sharing input root", async function () {
await new DirectRunner().run((root) => {
- const input = root.apply(new beam.Create([1, 2, 1]));
+ const input = root.apply(beam.create([1, 2, 1]));
// TODO: Can this type be inferred?
const sideInput: beam.PCollection<{ sum: number }> = input.apply(
- new GroupGlobally().combining((e) => e, combiners.sum, "sum")
+ beam.groupGlobally().combining((e) => e, combiners.sum, "sum")
);
input
.map((e, context) => e / context.side.lookup().sum, {
- side: new pardo.SingletonSideInput(sideInput),
+ side: pardo.singletonSideInput(sideInput),
})
- .apply(new testing.AssertDeepEqual([0.25, 0.5, 0.25]));
+ .apply(testing.assertDeepEqual([0.25, 0.5, 0.25]));
});
});
it("runs a map with window-sensitive context", async function () {
await new DirectRunner().run((root) => {
root
- .apply(new beam.Create([1, 2, 3, 4, 5, 10, 11, 12]))
- .apply(new beam.AssignTimestamps((t) => Long.fromValue(t * 1000)))
- .apply(new beam.WindowInto(new windowings.FixedWindows(10)))
- .apply(new beam.GroupBy((e: number) => ""))
+ .apply(beam.create([1, 2, 3, 4, 5, 10, 11, 12]))
+ .apply(beam.assignTimestamps((t) => Long.fromValue(t * 1000)))
+ .apply(beam.windowInto(windowings.fixedWindows(10)))
+ .apply(beam.groupBy((e: number) => ""))
.map(
withName(
"MapWithContext",
@@ -138,10 +124,10 @@ describe("primitives module", function () {
),
// This is the context to pass as the second argument.
// At each element, window.get() will return the associated window.
- { window: new pardo.WindowParam(), other: "A" }
+ { window: pardo.windowParam(), other: "A" }
)
.apply(
- new testing.AssertDeepEqual([
+ testing.assertDeepEqual([
{ key: "", value: [1, 2, 3, 4, 5], window_start_ms: 0, a: "A" },
{ key: "", value: [10, 11, 12], window_start_ms: 10000, a: "A" },
])
@@ -152,11 +138,11 @@ describe("primitives module", function () {
it("runs a WindowInto", async function () {
await new DirectRunner().run((root) => {
root
- .apply(new beam.Create(["apple", "apricot", "banana"]))
- .apply(new beam.WindowInto(new windowings.GlobalWindows()))
- .apply(new beam.GroupBy((e: string) => e[0]))
+ .apply(beam.create(["apple", "apricot", "banana"]))
+ .apply(beam.windowInto(windowings.globalWindows()))
+ .apply(beam.groupBy((e: string) => e[0]))
.apply(
- new testing.AssertDeepEqual([
+ testing.assertDeepEqual([
{ key: "a", value: ["apple", "apricot"] },
{ key: "b", value: ["banana"] },
])
@@ -167,12 +153,12 @@ describe("primitives module", function () {
it("runs a WindowInto IntervalWindow", async function () {
await new DirectRunner().run((root) => {
root
- .apply(new beam.Create([1, 2, 3, 4, 5, 10, 11, 12]))
- .apply(new beam.AssignTimestamps((t) => Long.fromValue(t * 1000)))
- .apply(new beam.WindowInto(new windowings.FixedWindows(10)))
- .apply(new beam.GroupBy((e: number) => ""))
+ .apply(beam.create([1, 2, 3, 4, 5, 10, 11, 12]))
+ .apply(beam.assignTimestamps((t) => Long.fromValue(t * 1000)))
+ .apply(beam.windowInto(windowings.fixedWindows(10)))
+ .apply(beam.groupBy((e: number) => ""))
.apply(
- new testing.AssertDeepEqual([
+ testing.assertDeepEqual([
{ key: "", value: [1, 2, 3, 4, 5] },
{ key: "", value: [10, 11, 12] },
])
@@ -185,7 +171,7 @@ describe("primitives module", function () {
// TODO: test output with direct runner.
it("runs a basic Impulse expansion", function () {
var p = new Pipeline();
- var res = new beam.Root(p).apply(new beam.Impulse());
+ var res = new beam.Root(p).apply(beam.impulse());
assert.equal(res.type, "pcollection");
assert.deepEqual(p.context.getPCollectionCoder(res), new BytesCoder());
@@ -193,7 +179,7 @@ describe("primitives module", function () {
it("runs a ParDo expansion", function () {
var p = new Pipeline();
var res = new beam.Root(p)
- .apply(new beam.Impulse())
+ .apply(beam.impulse())
.map(function (v: any) {
return v * 2;
})
@@ -211,11 +197,11 @@ describe("primitives module", function () {
it("runs a GroupBy expansion", function () {
var p = new Pipeline();
var res = new beam.Root(p)
- .apply(new beam.Impulse())
+ .apply(beam.impulse())
.map(function createElement(v) {
return { name: "pablo", lastName: "wat" };
})
- .apply(new GroupBy("lastName"));
+ .apply(beam.groupBy("lastName"));
assert.deepEqual(
p.context.getPCollectionCoder(res),
diff --git a/sdks/typescript/test/standard_coders_test.ts b/sdks/typescript/test/standard_coders_test.ts
index dd80ce12d66..3d34199fccb 100644
--- a/sdks/typescript/test/standard_coders_test.ts
+++ b/sdks/typescript/test/standard_coders_test.ts
@@ -137,29 +137,23 @@ describe("standard Beam coders on Javascript", function () {
contexts.forEach((context) => {
describe("in Context " + context, function () {
const spec = doc;
-
- const coderConstructor = globalRegistry().get(urn);
- var coder;
+ var components;
if (spec.coder.components) {
- var components;
- try {
- components = spec.coder.components.map(
- (c) => new (globalRegistry().get(c.urn))()
- );
- } catch (Error) {
- return;
- }
- coder = new coderConstructor(...components);
+ components = spec.coder.components.map(
+ // Second level coders have neither payloads nor components.
+ (c) => globalRegistry().getCoder(c.urn)
+ );
} else {
- coder = new coderConstructor();
+ components = [];
}
- describeCoder(coder, urn, context, spec);
+ const coder = globalRegistry().getCoder(urn, undefined, ...components);
+ runCoderTest(coder, urn, context, spec);
});
});
});
});
-function describeCoder<T>(coder: Coder<T>, urn, context, spec: CoderSpec) {
+function runCoderTest<T>(coder: Coder<T>, urn, context, spec: CoderSpec) {
describe(
util.format(
"coder %s (%s)",
diff --git a/sdks/typescript/test/wordcount.ts b/sdks/typescript/test/wordcount.ts
index f13655616e3..b7267ef5589 100644
--- a/sdks/typescript/test/wordcount.ts
+++ b/sdks/typescript/test/wordcount.ts
@@ -19,9 +19,6 @@
import * as beam from "../src/apache_beam";
import { DirectRunner } from "../src/apache_beam/runners/direct_runner";
import * as testing from "../src/apache_beam/testing/assert";
-import { KV } from "../src/apache_beam/values";
-import { GroupBy } from "../src/apache_beam/transforms/group_and_combine";
-import * as combiners from "../src/apache_beam/transforms/combiners";
import { PortableRunner } from "../src/apache_beam/runners/portable_runner/runner";
@@ -33,29 +30,14 @@ function wordCount(
.flatMap(function* splitWords(line: string) {
yield* line.split(/[^a-z]+/);
})
- .apply(new CountElements("Count"));
-}
-
-class CountElements extends beam.PTransform<
- beam.PCollection<any>,
- beam.PCollection<KV<any, number>>
-> {
- expand(input: beam.PCollection<any>) {
- return input.apply(
- new GroupBy((e) => e, "element").combining(
- (e) => 1,
- combiners.sum,
- "count"
- )
- );
- }
+ .apply(beam.countPerElement());
}
describe("wordcount", function () {
it("wordcount", async function () {
await new DirectRunner().run((root) => {
const lines = root.apply(
- new beam.Create([
+ beam.create([
"In the beginning God created the heaven and the earth.",
"And the earth was without form, and void; and darkness was upon the face of the deep.",
"And the Spirit of God moved upon the face of the waters.",
@@ -70,13 +52,11 @@ describe("wordcount", function () {
it("wordcount assert", async function () {
await new DirectRunner().run((root) => {
const lines = root.apply(
- new beam.Create([
- "And God said, Let there be light: and there was light",
- ])
+ beam.create(["And God said, Let there be light: and there was light"])
);
lines.apply(wordCount).apply(
- new testing.AssertDeepEqual([
+ testing.assertDeepEqual([
{ element: "and", count: 2 },
{ element: "god", count: 1 },
{ element: "said", count: 1 },