You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/07/11 18:15:01 UTC

[beam] branch master updated: Use async as a suffix rather than a prefix for asynchronous variants. (#22134)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e16941d69f Use async as a suffix rather than a prefix for asynchronous variants. (#22134)
6e16941d69f is described below

commit 6e16941d69f7fa171a409e7bd43932ae1b25f082
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Mon Jul 11 11:14:54 2022 -0700

    Use async as a suffix rather than a prefix for asynchronous variants. (#22134)
    
    This is better aligned with standard javascript convention.
    
    Often libraries have the default version be asynchronous, and
    name the Sync one explicitly, we are marking the async ones as
    they are by far the most common and usable for pipeline
    construction.
---
 .../src/apache_beam/examples/wordcount_sql.ts      |  2 +-
 .../src/apache_beam/examples/wordcount_textio.ts   |  2 +-
 .../src/apache_beam/internal/pipeline.ts           |  4 +--
 sdks/typescript/src/apache_beam/io/avroio.ts       |  2 +-
 sdks/typescript/src/apache_beam/io/parquetio.ts    |  4 +--
 sdks/typescript/src/apache_beam/io/pubsub.ts       |  4 +--
 sdks/typescript/src/apache_beam/io/textio.ts       | 12 ++++----
 sdks/typescript/src/apache_beam/pvalue.ts          | 14 +++++-----
 .../src/apache_beam/transforms/external.ts         |  2 +-
 sdks/typescript/src/apache_beam/transforms/sql.ts  |  2 +-
 .../src/apache_beam/transforms/transform.ts        | 10 +++----
 sdks/typescript/test/docs/programming_guide.ts     |  2 +-
 sdks/typescript/test/io_test.ts                    | 32 +++++++++++-----------
 .../content/en/documentation/programming-guide.md  |  4 +--
 14 files changed, 48 insertions(+), 48 deletions(-)

diff --git a/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts b/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts
index c2453d64b93..2ea4204886c 100644
--- a/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts
+++ b/sdks/typescript/src/apache_beam/examples/wordcount_sql.ts
@@ -37,7 +37,7 @@ async function main() {
     const filtered = await lines
       .map((w) => ({ word: w }))
       .apply(beam.withRowCoder({ word: "str" }))
-      .asyncApply(
+      .applyAsync(
         sqlTransform(
           "SELECT word, count(*) as c from PCOLLECTION group by word"
         )
diff --git a/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts b/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts
index 686be3b0594..2841ec8f77d 100644
--- a/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts
+++ b/sdks/typescript/src/apache_beam/examples/wordcount_textio.ts
@@ -37,7 +37,7 @@ function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
 async function main() {
   // python apache_beam/runners/portability/local_job_service_main.py --port 3333
   await new PortableRunner("localhost:3333").run(async (root) => {
-    const lines = await root.asyncApply(
+    const lines = await root.applyAsync(
       textio.readFromText("gs://dataflow-samples/shakespeare/kinglear.txt")
     );
 
diff --git a/sdks/typescript/src/apache_beam/internal/pipeline.ts b/sdks/typescript/src/apache_beam/internal/pipeline.ts
index db5592d0bb1..fe57f120969 100644
--- a/sdks/typescript/src/apache_beam/internal/pipeline.ts
+++ b/sdks/typescript/src/apache_beam/internal/pipeline.ts
@@ -179,7 +179,7 @@ export class Pipeline {
     return this.postApplyTransform(transform, transformProto, result);
   }
 
-  async asyncApplyTransform<
+  async applyAsyncTransform<
     InputT extends pvalue.PValue<any>,
     OutputT extends pvalue.PValue<any>
   >(transform: AsyncPTransformClass<InputT, OutputT>, input: InputT) {
@@ -190,7 +190,7 @@ export class Pipeline {
     let result: OutputT;
     try {
       this.transformStack.push(transformId);
-      result = await transform.asyncExpandInternal(input, this, transformProto);
+      result = await transform.expandInternalAsync(input, this, transformProto);
     } finally {
       this.transformStack.pop();
     }
diff --git a/sdks/typescript/src/apache_beam/io/avroio.ts b/sdks/typescript/src/apache_beam/io/avroio.ts
index 225dea9d858..b6ebf45cf28 100644
--- a/sdks/typescript/src/apache_beam/io/avroio.ts
+++ b/sdks/typescript/src/apache_beam/io/avroio.ts
@@ -44,7 +44,7 @@ export function writeToAvro<T>(filePath: string, options: { schema: Schema }) {
         withCoderInternal(RowCoder.fromSchema(options.schema))
       );
     }
-    return pcoll.asyncApply(
+    return pcoll.applyAsync(
       schemaio<beam.PCollection<T>, {}>(
         "writeToAvro",
         "beam:transform:org.apache.beam:schemaio_avro_write:v1",
diff --git a/sdks/typescript/src/apache_beam/io/parquetio.ts b/sdks/typescript/src/apache_beam/io/parquetio.ts
index cdf3b2b65e2..e7c7f7dc66e 100644
--- a/sdks/typescript/src/apache_beam/io/parquetio.ts
+++ b/sdks/typescript/src/apache_beam/io/parquetio.ts
@@ -33,7 +33,7 @@ export function readFromParquet(
   } = {}
 ): (root: beam.Root) => Promise<beam.PCollection<any>> {
   return async function readFromParquet(root: beam.Root) {
-    return root.asyncApply(
+    return root.applyAsync(
       pythonTransform("apache_beam.dataframe.io.ReadViaPandas", {
         path: filePattern,
         format: "parquet",
@@ -57,7 +57,7 @@ export function writeToParquet(
       delete options.schema;
     }
     return {
-      filesWritten: await toWrite.asyncApply(
+      filesWritten: await toWrite.applyAsync(
         pythonTransform("apache_beam.dataframe.io.WriteViaPandas", {
           path: filePathPrefix,
           format: "parquet",
diff --git a/sdks/typescript/src/apache_beam/io/pubsub.ts b/sdks/typescript/src/apache_beam/io/pubsub.ts
index ead6c57e6f8..c5513fe4171 100644
--- a/sdks/typescript/src/apache_beam/io/pubsub.ts
+++ b/sdks/typescript/src/apache_beam/io/pubsub.ts
@@ -96,7 +96,7 @@ export function readFromPubSubWithAttributes(
 > {
   return async function readFromPubSubWithAttributes(root: beam.Root) {
     return (
-      await root.asyncApply(readFromPubSubWithAttributesRaw(options))
+      await root.applyAsync(readFromPubSubWithAttributesRaw(options))
     ).map((encoded) =>
       PubSub.protos.google.pubsub.v1.PubsubMessage.decode(encoded)
     );
@@ -126,7 +126,7 @@ export function writeToPubSub(topic: string, options: WriteOptions = {}) {
         PubSub.protos.google.pubsub.v1.PubsubMessage.encode({ data }).finish()
       )
       .apply(internal.withCoderInternal(new BytesCoder()))
-      .asyncApply(writeToPubSubRaw(topic, options));
+      .applyAsync(writeToPubSubRaw(topic, options));
   };
 }
 
diff --git a/sdks/typescript/src/apache_beam/io/textio.ts b/sdks/typescript/src/apache_beam/io/textio.ts
index 67a83727947..7dedbcf2698 100644
--- a/sdks/typescript/src/apache_beam/io/textio.ts
+++ b/sdks/typescript/src/apache_beam/io/textio.ts
@@ -30,7 +30,7 @@ export function readFromText(
   filePattern: string
 ): beam.AsyncPTransform<beam.Root, beam.PCollection<string>> {
   return async function readFromText(root: beam.Root) {
-    return root.asyncApply(
+    return root.applyAsync(
       pythonTransform<beam.Root, beam.PCollection<string>>(
         "apache_beam.io.ReadFromText",
         {
@@ -59,7 +59,7 @@ export function writeToText(
       filesWritten: await pcoll
         .map((e) => (typeof e == "string" ? e : "" + e))
         .apply(withCoderInternal(new StrUtf8Coder()))
-        .asyncApply(
+        .applyAsync(
           pythonTransform("apache_beam.io.WriteToText", {
             file_path_prefix: filePathPrefix,
             ...camelToSnakeOptions(options),
@@ -74,7 +74,7 @@ export function readFromCsv(
   options: {} = {}
 ): (root: beam.Root) => Promise<beam.PCollection<any>> {
   return async function readFromCsv(root: beam.Root) {
-    return root.asyncApply(
+    return root.applyAsync(
       pythonTransform("apache_beam.dataframe.io.ReadViaPandas", {
         path: filePattern,
         format: "csv",
@@ -96,7 +96,7 @@ export function writeToCsv(
       toWrite = toWrite.apply(withCoderInternal(RowCoder.fromSchema(schema)));
     }
     return {
-      filesWritten: await toWrite.asyncApply(
+      filesWritten: await toWrite.applyAsync(
         pythonTransform("apache_beam.dataframe.io.WriteViaPandas", {
           path: filePathPrefix,
           format: "csv",
@@ -113,7 +113,7 @@ export function readFromJson(
   options: {} = {}
 ): (root: beam.Root) => Promise<beam.PCollection<any>> {
   return async function readFromJson(root: beam.Root) {
-    return root.asyncApply(
+    return root.applyAsync(
       pythonTransform("apache_beam.dataframe.io.ReadViaPandas", {
         path: filePattern,
         format: "json",
@@ -137,7 +137,7 @@ export function writeToJson(
       toWrite = toWrite.apply(withCoderInternal(RowCoder.fromSchema(schema)));
     }
     return {
-      filesWritten: await toWrite.asyncApply(
+      filesWritten: await toWrite.applyAsync(
         pythonTransform("apache_beam.dataframe.io.WriteViaPandas", {
           path: filePathPrefix,
           format: "json",
diff --git a/sdks/typescript/src/apache_beam/pvalue.ts b/sdks/typescript/src/apache_beam/pvalue.ts
index 3185c4771b7..7275095cc47 100644
--- a/sdks/typescript/src/apache_beam/pvalue.ts
+++ b/sdks/typescript/src/apache_beam/pvalue.ts
@@ -47,13 +47,13 @@ export class Root {
     return this.pipeline.applyTransform(transform, this);
   }
 
-  async asyncApply<OutputT extends PValue<any>>(
+  async applyAsync<OutputT extends PValue<any>>(
     transform: AsyncPTransform<Root, OutputT>
   ) {
     if (!(transform instanceof AsyncPTransformClass)) {
       transform = new AsyncPTransformClassFromCallable(transform);
     }
-    return await this.pipeline.asyncApplyTransform(transform, this);
+    return await this.pipeline.applyAsyncTransform(transform, this);
   }
 }
 
@@ -91,13 +91,13 @@ export class PCollection<T> {
     return this.pipeline.applyTransform(transform, this);
   }
 
-  asyncApply<OutputT extends PValue<any>>(
+  applyAsync<OutputT extends PValue<any>>(
     transform: AsyncPTransform<PCollection<T>, OutputT>
   ) {
     if (!(transform instanceof AsyncPTransformClass)) {
       transform = new AsyncPTransformClassFromCallable(transform);
     }
-    return this.pipeline.asyncApplyTransform(transform, this);
+    return this.pipeline.applyAsyncTransform(transform, this);
   }
 
   map<OutputT, ContextT>(
@@ -228,14 +228,14 @@ class PValueWrapper<T extends PValue<any>> {
     return this.pipeline(root).applyTransform(transform, this.pvalue);
   }
 
-  async asyncApply<O extends PValue<any>>(
+  async applyAsync<O extends PValue<any>>(
     transform: AsyncPTransform<T, O>,
     root: Root | null = null
   ) {
     if (!(transform instanceof AsyncPTransformClass)) {
       transform = new AsyncPTransformClassFromCallable(transform);
     }
-    return await this.pipeline(root).asyncApplyTransform(
+    return await this.pipeline(root).applyAsyncTransform(
       transform,
       this.pvalue
     );
@@ -302,7 +302,7 @@ class AsyncPTransformClassFromCallable<
     this.expander = expander;
   }
 
-  async asyncExpandInternal(
+  async expandInternalAsync(
     input: InputT,
     pipeline: Pipeline,
     transformProto: runnerApi.PTransform
diff --git a/sdks/typescript/src/apache_beam/transforms/external.ts b/sdks/typescript/src/apache_beam/transforms/external.ts
index af6d27e6623..2f66cca9d2d 100644
--- a/sdks/typescript/src/apache_beam/transforms/external.ts
+++ b/sdks/typescript/src/apache_beam/transforms/external.ts
@@ -101,7 +101,7 @@ class RawExternalTransform<
     }
   }
 
-  async asyncExpandInternal(
+  async expandInternalAsync(
     input: InputT,
     pipeline: Pipeline,
     transformProto: runnerApi.PTransform
diff --git a/sdks/typescript/src/apache_beam/transforms/sql.ts b/sdks/typescript/src/apache_beam/transforms/sql.ts
index 9170683c1cd..9f0e117f2f8 100644
--- a/sdks/typescript/src/apache_beam/transforms/sql.ts
+++ b/sdks/typescript/src/apache_beam/transforms/sql.ts
@@ -84,7 +84,7 @@ export function sqlTransform<
       ) as InputT;
     }
 
-    return await P(input).asyncApply(
+    return await P(input).applyAsync(
       external.rawExternalTransform(
         "beam:external:java:sql:v1",
         { query: query },
diff --git a/sdks/typescript/src/apache_beam/transforms/transform.ts b/sdks/typescript/src/apache_beam/transforms/transform.ts
index fe84009987b..da22cb70dcd 100644
--- a/sdks/typescript/src/apache_beam/transforms/transform.ts
+++ b/sdks/typescript/src/apache_beam/transforms/transform.ts
@@ -73,16 +73,16 @@ export class AsyncPTransformClass<
     this.beamName = name || this.constructor.name;
   }
 
-  async asyncExpand(input: InputT): Promise<OutputT> {
+  async expandAsync(input: InputT): Promise<OutputT> {
     throw new Error("Method expand has not been implemented.");
   }
 
-  async asyncExpandInternal(
+  async expandInternalAsync(
     input: InputT,
     pipeline: Pipeline,
     transformProto: runnerApi.PTransform
   ): Promise<OutputT> {
-    return this.asyncExpand(input);
+    return this.expandAsync(input);
   }
 }
 
@@ -94,7 +94,7 @@ export class PTransformClass<
     throw new Error("Method expand has not been implemented.");
   }
 
-  async asyncExpand(input: InputT): Promise<OutputT> {
+  async expandAsync(input: InputT): Promise<OutputT> {
     return this.expand(input);
   }
 
@@ -106,7 +106,7 @@ export class PTransformClass<
     return this.expand(input);
   }
 
-  async asyncExpandInternal(
+  async expandInternalAsync(
     input: InputT,
     pipeline: Pipeline,
     transformProto: runnerApi.PTransform
diff --git a/sdks/typescript/test/docs/programming_guide.ts b/sdks/typescript/test/docs/programming_guide.ts
index c6cb3131f48..fc038b47982 100644
--- a/sdks/typescript/test/docs/programming_guide.ts
+++ b/sdks/typescript/test/docs/programming_guide.ts
@@ -60,7 +60,7 @@ describe("Programming Guide Tested Samples", function () {
       // [START pipelines_constructing_reading]
       async function pipeline(root: beam.Root) {
         // Note that textio.ReadFromText is an AsyncPTransform.
-        const pcoll: PCollection<string> = await root.asyncApply(
+        const pcoll: PCollection<string> = await root.applyAsync(
           textio.ReadFromText("path/to/text_pattern")
         );
       }
diff --git a/sdks/typescript/test/io_test.ts b/sdks/typescript/test/io_test.ts
index 6947a2d65b7..3d4a2e03085 100644
--- a/sdks/typescript/test/io_test.ts
+++ b/sdks/typescript/test/io_test.ts
@@ -71,12 +71,12 @@ xdescribe("IO Tests", function () {
     await createRunner().run(async (root) => {
       await root //
         .apply(beam.create(lines))
-        .asyncApply(textio.writeToText(path.join(tempDir, "out.txt")));
+        .applyAsync(textio.writeToText(path.join(tempDir, "out.txt")));
     });
 
     await createRunner().run(async (root) => {
       (
-        await root.asyncApply(
+        await root.applyAsync(
           textio.readFromText(path.join(tempDir, "out.txt*"))
         )
       ).apply(testing.assertDeepEqual(lines));
@@ -90,13 +90,13 @@ xdescribe("IO Tests", function () {
       await root //
         .apply(beam.create(elements))
         .apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0])))
-        .asyncApply(textio.writeToCsv(path.join(tempDir, "out.csv")));
+        .applyAsync(textio.writeToCsv(path.join(tempDir, "out.csv")));
     });
     console.log(tempDir);
 
     await createRunner().run(async (root) => {
       (
-        await root.asyncApply(
+        await root.applyAsync(
           textio.readFromCsv(path.join(tempDir, "out.csv*"))
         )
       ).apply(testing.assertDeepEqual(elements));
@@ -110,12 +110,12 @@ xdescribe("IO Tests", function () {
       await root //
         .apply(beam.create(elements))
         .apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0])))
-        .asyncApply(textio.writeToJson(path.join(tempDir, "out.json")));
+        .applyAsync(textio.writeToJson(path.join(tempDir, "out.json")));
     });
 
     await createRunner().run(async (root) => {
       (
-        await root.asyncApply(
+        await root.applyAsync(
           textio.readFromJson(path.join(tempDir, "out.json*"))
         )
       ).apply(testing.assertDeepEqual(elements));
@@ -129,14 +129,14 @@ xdescribe("IO Tests", function () {
       await root //
         .apply(beam.create(elements))
         .apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0])))
-        .asyncApply(
+        .applyAsync(
           parquetio.writeToParquet(path.join(tempDir, "out.parquet"))
         );
     });
 
     await createRunner().run(async (root) => {
       (
-        await root.asyncApply(
+        await root.applyAsync(
           parquetio.readFromParquet(path.join(tempDir, "out.parquet*"))
         )
       ).apply(testing.assertDeepEqual(elements));
@@ -144,7 +144,7 @@ xdescribe("IO Tests", function () {
 
     await createRunner().run(async (root) => {
       (
-        await root.asyncApply(
+        await root.applyAsync(
           parquetio.readFromParquet(path.join(tempDir, "out.parquet*"), {
             columns: ["label", "rank"],
           })
@@ -176,14 +176,14 @@ xdescribe("IO Tests", function () {
     await createRunner(options).run(async (root) => {
       await root //
         .apply(beam.create(elements))
-        .asyncApply(
+        .applyAsync(
           avroio.writeToAvro(path_join(tempDir, "out.avro"), { schema })
         );
     });
 
     await createRunner(options).run(async (root) => {
       (
-        await root.asyncApply(
+        await root.applyAsync(
           avroio.readFromAvro(path_join(tempDir, "out.avro*"), { schema })
         )
       ).apply(testing.assertDeepEqual(elements));
@@ -239,19 +239,19 @@ xdescribe("IO Tests", function () {
         await root //
           .apply(beam.create(elements))
           .apply(internal.withCoderInternal(RowCoder.fromJSON(elements[0])))
-          .asyncApply(
+          .applyAsync(
             bigqueryio.writeToBigQuery(table, { createDisposition: "IfNeeded" })
           );
       });
 
       await createRunner(options).run(async (root) => {
-        (await root.asyncApply(bigqueryio.readFromBigQuery({ table }))) //
+        (await root.applyAsync(bigqueryio.readFromBigQuery({ table }))) //
           .apply(testing.assertDeepEqual(elements));
       });
 
       await createRunner(options).run(async (root) => {
         (
-          await root.asyncApply(
+          await root.applyAsync(
             bigqueryio.readFromBigQuery({
               query: `SELECT label, rank FROM ${table}`,
             })
@@ -286,7 +286,7 @@ xdescribe("IO Tests", function () {
     try {
       pipelineHandle = await createRunner(options).runAsync(async (root) => {
         await (
-          await root.asyncApply(
+          await root.applyAsync(
             pubsub.readFromPubSub({
               subscription: readSubscription.name,
             })
@@ -296,7 +296,7 @@ xdescribe("IO Tests", function () {
           .map((msg) => msg.toUpperCase())
           .map((msg) => new TextEncoder().encode(msg))
           .apply(internal.withCoderInternal(new BytesCoder()))
-          .asyncApply(pubsub.writeToPubSub(writeTopic.name));
+          .applyAsync(pubsub.writeToPubSub(writeTopic.name));
       });
       console.log("Pipeline started", pipelineHandle.jobId);
 
diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index eeae46e0639..12098949f83 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -617,7 +617,7 @@ the transform itself as an argument, and the operation returns the output
 
 {{< highlight typescript >}}
 [Output PCollection] = [Input PCollection].apply([Transform])
-[Output PCollection] = await [Input PCollection].asyncApply([AsyncTransform])
+[Output PCollection] = await [Input PCollection].applyAsync([AsyncTransform])
 {{< /highlight >}}
 
 {{< paragraph class="language-java language-py language-typescript" >}}
@@ -735,7 +735,7 @@ One can apply transforms to these composite types by wrapping them with
 {{< paragraph class="language-typescript" >}}
 PTransforms come in two flavors, synchronous and asynchronous, depending on
 whether their *application** involves asynchronous invocations.
-An `AsyncTransform` must be applied with `asyncApply` and returns a `Promise`
+An `AsyncTransform` must be applied with `applyAsync` and returns a `Promise`
 which must be awaited before further pipeline construction.
 {{< /paragraph >}}