You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/13 17:24:17 UTC

[GitHub] [beam] pcoet opened a new pull request, #22263: Rewrote Java multi-language pipeline quickstart

pcoet opened a new pull request, #22263:
URL: https://github.com/apache/beam/pull/22263

   **Please** add a meaningful description for your change here
   
   Refreshed the content for the Java multi-language pipeline quickstart.
   
   @chamikaramj 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pcoet commented on a diff in pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
pcoet commented on code in PR #22263:
URL: https://github.com/apache/beam/pull/22263#discussion_r922179269


##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+for Java, so you can use that instead of specifying the fully qualified name for
+the Python `DataframeTransform`.
+
+Here's the complete pipeline definition from the example:
+
+```java
+static void runWordCount(WordCountOptions options) {
+  Pipeline p = Pipeline.create(options);
+
+  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .setRowSchema(ExtractWordsFn.SCHEMA)
+      .apply(
+          PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
+                  "apache_beam.dataframe.transforms.DataframeTransform",
+                  options.getExpansionService())
+              .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+              .withKwarg("include_indexes", true))
+      .apply(MapElements.via(new FormatAsTextFn()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()));
+
+  p.run().waitUntilFinish();
 }
-{{< /highlight >}}
-
-The main differences with authoring a classic pipeline and transform are
-
-- The PTransform uses the [External](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) transform.
-- This has a Uniform Resource Name (URN) which will identify the transform in your expansion service (more below).
-- The address on which the expansion service is running.
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms) for a deeper understanding of using external transforms.
-
-### External transform
-
-The transform we are trying to call from Java is defined in Python as follows:
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-URN = "beam:transforms:xlang:pythontransform"
-
-@ptransform.PTransform.register_urn(URN, None)
-class PythonTransform(ptransform.PTransform):
-    def __init__(self):
-        super().__init__()
-
-    def expand(self, pcoll):
-        return (pcoll
-                | "Input preparation"
-                    >> beam.Map(
-                        lambda input: google.protobuf.text_format.Parse(input, tf.train.Example())
-                    )
-                | "Get predictions" >> RunInference(
-                        model_spec_pb2.InferenceSpecType(
-                            saved_model_spec=model_spec_pb2.SavedModelSpec(
-                                model_path=model_path,
-                                signature_name=['serving_default']))))
-
-    def to_runner_api_parameter(self, unused_context):
-        return URN, None
-
-    def from_runner_api_parameter(
-        unused_ptransform, unused_paramter, unused_context):
-        return PythonTransform()
-{{< /highlight >}}
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#create-x-lang-transforms) for a deeper understanding of creating an external transform.
-
-### Expansion service
-
-The expansion service is written in the same language as the external transform. It takes care of injecting the transforms in your pipeline before submitting them to the Runner.
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-def main(unused_argv):
-  parser = argparse.ArgumentParser()
-  parser.add_argument(
-      '-p', '--port', type=int, help='port on which to serve the job api')
-  options = parser.parse_args()
-  global server
-  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
-  beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
-      expansion_service.ExpansionServiceServicer(
-          PipelineOptions(
-              ["--experiments", "beam_fn_api", "--sdk_location", "container"])), server)
-  server.add_insecure_port('localhost:{}'.format(options.port))
-  server.start()
-  _LOGGER.info('Listening for expansion requests at %d', options.port)
-
-  signal.signal(signal.SIGTERM, cleanup)
-  signal.signal(signal.SIGINT, cleanup)
-  signal.pause()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  main(sys.argv)
-{{< /highlight >}}
-
-## How to run the cross-language pipeline?
-
-In this section, the steps to run a cross-language pipeline are set out:
-
-1. Start the **expansion service** with your Python transforms: `python expansion_service.py -p 9097`
-1. Start the **Job Server** which will translated into the stage that will run on your back-end or runner (e.g. Spark):
-
-   - From Apache Beam source code:
-     `./gradlew :runners:spark:job-server:runShadow`
-   - Using the pre-build Docker container:
-     `docker run -net=host apache/beam_spark_job_server`
-
-1. **Run pipeline**: ```mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \
-    -Pportable-runner \
-    -Dexec.args=" \
-        --runner=PortableRunner \
-        --jobEndpoint=localhost:8099 \
-        --useExternal=true \
-        --expansionServiceURL=localhost:9097 \
-        --experiments=beam_fn_api"```
+```
+
+`PythonExternalTransform` is a wrapper for invoking external Python functions.
+The
+[`from`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#from-java.lang.String-java.lang.String-)
+method accepts two strings: 1) the fully qualified transform name; 2) an
+optional address and port number for the expansion service. The method returns
+the transform.
+[`withKwarg`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#withKwarg-java.lang.String-java.lang.Object-)
+specifies a keyword argument for instantiating the Python cross-language
+transform. In this case, `withKwarg` is invoked twice, to specify a `func`
+argument and an `include_indexes` argument, and these arguments are passed to
+`DataframeTransform`.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pcoet commented on a diff in pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
pcoet commented on code in PR #22263:
URL: https://github.com/apache/beam/pull/22263#discussion_r922168920


##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)

Review Comment:
   That's great, thanks. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pcoet commented on a diff in pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
pcoet commented on code in PR #22263:
URL: https://github.com/apache/beam/pull/22263#discussion_r922183116


##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart

Review Comment:
   I would push back on this a little bit. The intro makes it clear what we're doing in this quickstart: "In the examples below, the multi-language pipeline is built with the Beam Java SDK, and the cross-language transform is built with the Beam Python SDK." It's analogous to what we're doing with the Python multi-lang quickstart. At some point in the future, we _might_ need to change both sets of title/URL and narrow the focus, but we're not there yet. I'd rather maintain the symmetry. But let me know...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pcoet commented on pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
pcoet commented on PR #22263:
URL: https://github.com/apache/beam/pull/22263#issuecomment-1184997529

   > Probably worth mentioning that we also have higher-level [`DataframeTransform`](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransformTest.java#L51) so users don't need to manually specify Python DataframeTransform FQN.
   
   Thanks, Heejong. Done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pcoet commented on a diff in pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
pcoet commented on code in PR #22263:
URL: https://github.com/apache/beam/pull/22263#discussion_r922193149


##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+for Java, so you can use that instead of specifying the fully qualified name for
+the Python `DataframeTransform`.
+
+Here's the complete pipeline definition from the example:
+
+```java
+static void runWordCount(WordCountOptions options) {
+  Pipeline p = Pipeline.create(options);
+
+  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .setRowSchema(ExtractWordsFn.SCHEMA)
+      .apply(
+          PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
+                  "apache_beam.dataframe.transforms.DataframeTransform",
+                  options.getExpansionService())
+              .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+              .withKwarg("include_indexes", true))
+      .apply(MapElements.via(new FormatAsTextFn()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()));
+
+  p.run().waitUntilFinish();
 }
-{{< /highlight >}}
-
-The main differences with authoring a classic pipeline and transform are
-
-- The PTransform uses the [External](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) transform.
-- This has a Uniform Resource Name (URN) which will identify the transform in your expansion service (more below).
-- The address on which the expansion service is running.
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms) for a deeper understanding of using external transforms.
-
-### External transform
-
-The transform we are trying to call from Java is defined in Python as follows:
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-URN = "beam:transforms:xlang:pythontransform"
-
-@ptransform.PTransform.register_urn(URN, None)
-class PythonTransform(ptransform.PTransform):
-    def __init__(self):
-        super().__init__()
-
-    def expand(self, pcoll):
-        return (pcoll
-                | "Input preparation"
-                    >> beam.Map(
-                        lambda input: google.protobuf.text_format.Parse(input, tf.train.Example())
-                    )
-                | "Get predictions" >> RunInference(
-                        model_spec_pb2.InferenceSpecType(
-                            saved_model_spec=model_spec_pb2.SavedModelSpec(
-                                model_path=model_path,
-                                signature_name=['serving_default']))))
-
-    def to_runner_api_parameter(self, unused_context):
-        return URN, None
-
-    def from_runner_api_parameter(
-        unused_ptransform, unused_paramter, unused_context):
-        return PythonTransform()
-{{< /highlight >}}
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#create-x-lang-transforms) for a deeper understanding of creating an external transform.
-
-### Expansion service
-
-The expansion service is written in the same language as the external transform. It takes care of injecting the transforms in your pipeline before submitting them to the Runner.
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-def main(unused_argv):
-  parser = argparse.ArgumentParser()
-  parser.add_argument(
-      '-p', '--port', type=int, help='port on which to serve the job api')
-  options = parser.parse_args()
-  global server
-  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
-  beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
-      expansion_service.ExpansionServiceServicer(
-          PipelineOptions(
-              ["--experiments", "beam_fn_api", "--sdk_location", "container"])), server)
-  server.add_insecure_port('localhost:{}'.format(options.port))
-  server.start()
-  _LOGGER.info('Listening for expansion requests at %d', options.port)
-
-  signal.signal(signal.SIGTERM, cleanup)
-  signal.signal(signal.SIGINT, cleanup)
-  signal.pause()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  main(sys.argv)
-{{< /highlight >}}
-
-## How to run the cross-language pipeline?
-
-In this section, the steps to run a cross-language pipeline are set out:
-
-1. Start the **expansion service** with your Python transforms: `python expansion_service.py -p 9097`
-1. Start the **Job Server** which will translated into the stage that will run on your back-end or runner (e.g. Spark):
-
-   - From Apache Beam source code:
-     `./gradlew :runners:spark:job-server:runShadow`
-   - Using the pre-build Docker container:
-     `docker run -net=host apache/beam_spark_job_server`
-
-1. **Run pipeline**: ```mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \
-    -Pportable-runner \
-    -Dexec.args=" \
-        --runner=PortableRunner \
-        --jobEndpoint=localhost:8099 \
-        --useExternal=true \
-        --expansionServiceURL=localhost:9097 \
-        --experiments=beam_fn_api"```
+```
+
+`PythonExternalTransform` is a wrapper for invoking external Python functions.
+The
+[`from`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#from-java.lang.String-java.lang.String-)
+method accepts two strings: 1) the fully qualified transform name; 2) an
+optional address and port number for the expansion service. The method returns
+the transform.
+[`withKwarg`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#withKwarg-java.lang.String-java.lang.Object-)
+specifies a keyword argument for instantiating the Python cross-language
+transform. In this case, `withKwarg` is invoked twice, to specify a `func`
+argument and an `include_indexes` argument, and these arguments are passed to
+`DataframeTransform`.
+
+To understand how this pipeline works, it’s helpful to look more closely at the
+first `withKwarg` invocation:
+
+```java
+.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+```
+
+The argument to `PythonCallableSource.of` is a string representation of a Python
+lambda function. `DataframeTransform` takes as an argument a Python callable to
+apply to a `PCollection` as if it were a Dataframe. The `withKwarg` method lets
+you specify a Python callable in your Java pipeline. To learn more about passing
+a function to `DataframeTransform`, see
+[Embedding DataFrames in a pipeline](/documentation/dsls/dataframes/overview/#embedding-dataframes-in-a-pipeline).
+
+## Run the Java pipeline
+
+If you want to customize the environment or use transforms not available in the
+default Beam SDK, you might need to run your own expansion service. In such
+cases, [start the expansion service](#advanced-start-an-expansion-service)
+before running your pipeline.
+
+For the general use case, you can simply run your multi-language pipeline using
+Gradle, as shown below.
+
+### Run with Dataflow runner
+
+The following script runs the example multi-language pipeline on Dataflow, using
+example text from a Cloud Storage bucket. You’ll need to adapt the script to
+your environment.
+
+```
+export OUTPUT_BUCKET=<bucket>
+export GCP_REGION=<region>
+export TEMP_LOCATION=gs://$OUTPUT_BUCKET/tmp
+export PYTHON_VERSION=<version>
+
+./gradlew :examples:multi-language:pythonDataframeWordCount --args=" \
+--runner=DataflowRunner \
+--output=gs://${OUTPUT_BUCKET}/count \
+--region=${GCP_REGION} \
+--experiments=use_runner_v2"
+```
+
+The pipeline outputs a file with the results to
+**gs://$OUTPUT_BUCKET/count-00000-of-00001**.
+
+Note: For Beam 2.40.0, you also need to set `sdkHarnessContainerImageOverrides`.
+
+## Advanced: Start an expansion service
+
+When building a job for a multi-language pipeline, Beam uses an
+[expansion service](/documentation/glossary/#expansion-service) to expand
+composite transforms. You must have at least one expansion service per remote
+SDK.
+
+In the general case, if you have a supported version of Python installed on your
+system, you can let `PythonExternalTransform` handle the details of creating and
+starting up the expansion service. But if you want to customize the environment
+or use transforms not available in the default Beam SDK, you might need to run
+your own expansion service.
+
+For example, to start the standard expansion service for a Python transform,
+[ExpansionServiceServicer](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/expansion_service.py),
+follow these steps:
+
+1. Activate a Python virtual environment and install Apache Beam, as described
+   in the [Python quick start](/get-started/quickstart-py/).
+2. In the **beam/sdks/python** directory of the Beam source code, run the
+   following command:
+
+   ```
+   python apache_beam/runners/portability/expansion_service_main.py -p 18089 --fully_qualified_name_glob "*"
+   ```
+
+The command runs
+[expansion_service_main.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/expansion_service_main.py), which starts the standard expansion service. You should see output similar to
+the following:
+
+```
+INFO:root:Default Python SDK image for environment is <IMAGE>

Review Comment:
   ACK. I removed the log output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pcoet commented on a diff in pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
pcoet commented on code in PR #22263:
URL: https://github.com/apache/beam/pull/22263#discussion_r922185812


##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+for Java, so you can use that instead of specifying the fully qualified name for
+the Python `DataframeTransform`.
+
+Here's the complete pipeline definition from the example:
+
+```java
+static void runWordCount(WordCountOptions options) {
+  Pipeline p = Pipeline.create(options);
+
+  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .setRowSchema(ExtractWordsFn.SCHEMA)
+      .apply(
+          PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
+                  "apache_beam.dataframe.transforms.DataframeTransform",
+                  options.getExpansionService())
+              .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+              .withKwarg("include_indexes", true))
+      .apply(MapElements.via(new FormatAsTextFn()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()));
+
+  p.run().waitUntilFinish();
 }
-{{< /highlight >}}
-
-The main differences with authoring a classic pipeline and transform are
-
-- The PTransform uses the [External](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) transform.
-- This has a Uniform Resource Name (URN) which will identify the transform in your expansion service (more below).
-- The address on which the expansion service is running.
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms) for a deeper understanding of using external transforms.
-
-### External transform
-
-The transform we are trying to call from Java is defined in Python as follows:
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-URN = "beam:transforms:xlang:pythontransform"
-
-@ptransform.PTransform.register_urn(URN, None)
-class PythonTransform(ptransform.PTransform):
-    def __init__(self):
-        super().__init__()
-
-    def expand(self, pcoll):
-        return (pcoll
-                | "Input preparation"
-                    >> beam.Map(
-                        lambda input: google.protobuf.text_format.Parse(input, tf.train.Example())
-                    )
-                | "Get predictions" >> RunInference(
-                        model_spec_pb2.InferenceSpecType(
-                            saved_model_spec=model_spec_pb2.SavedModelSpec(
-                                model_path=model_path,
-                                signature_name=['serving_default']))))
-
-    def to_runner_api_parameter(self, unused_context):
-        return URN, None
-
-    def from_runner_api_parameter(
-        unused_ptransform, unused_paramter, unused_context):
-        return PythonTransform()
-{{< /highlight >}}
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#create-x-lang-transforms) for a deeper understanding of creating an external transform.
-
-### Expansion service
-
-The expansion service is written in the same language as the external transform. It takes care of injecting the transforms in your pipeline before submitting them to the Runner.
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-def main(unused_argv):
-  parser = argparse.ArgumentParser()
-  parser.add_argument(
-      '-p', '--port', type=int, help='port on which to serve the job api')
-  options = parser.parse_args()
-  global server
-  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
-  beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
-      expansion_service.ExpansionServiceServicer(
-          PipelineOptions(
-              ["--experiments", "beam_fn_api", "--sdk_location", "container"])), server)
-  server.add_insecure_port('localhost:{}'.format(options.port))
-  server.start()
-  _LOGGER.info('Listening for expansion requests at %d', options.port)
-
-  signal.signal(signal.SIGTERM, cleanup)
-  signal.signal(signal.SIGINT, cleanup)
-  signal.pause()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  main(sys.argv)
-{{< /highlight >}}
-
-## How to run the cross-language pipeline?
-
-In this section, the steps to run a cross-language pipeline are set out:
-
-1. Start the **expansion service** with your Python transforms: `python expansion_service.py -p 9097`
-1. Start the **Job Server** which will translated into the stage that will run on your back-end or runner (e.g. Spark):
-
-   - From Apache Beam source code:
-     `./gradlew :runners:spark:job-server:runShadow`
-   - Using the pre-build Docker container:
-     `docker run -net=host apache/beam_spark_job_server`
-
-1. **Run pipeline**: ```mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \
-    -Pportable-runner \
-    -Dexec.args=" \
-        --runner=PortableRunner \
-        --jobEndpoint=localhost:8099 \
-        --useExternal=true \
-        --expansionServiceURL=localhost:9097 \
-        --experiments=beam_fn_api"```
+```
+
+`PythonExternalTransform` is a wrapper for invoking external Python functions.
+The
+[`from`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#from-java.lang.String-java.lang.String-)
+method accepts two strings: 1) the fully qualified transform name; 2) an
+optional address and port number for the expansion service. The method returns
+the transform.
+[`withKwarg`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#withKwarg-java.lang.String-java.lang.Object-)
+specifies a keyword argument for instantiating the Python cross-language
+transform. In this case, `withKwarg` is invoked twice, to specify a `func`
+argument and an `include_indexes` argument, and these arguments are passed to
+`DataframeTransform`.
+
+To understand how this pipeline works, it’s helpful to look more closely at the
+first `withKwarg` invocation:
+
+```java
+.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+```
+
+The argument to `PythonCallableSource.of` is a string representation of a Python
+lambda function. `DataframeTransform` takes as an argument a Python callable to
+apply to a `PCollection` as if it were a Dataframe. The `withKwarg` method lets
+you specify a Python callable in your Java pipeline. To learn more about passing
+a function to `DataframeTransform`, see
+[Embedding DataFrames in a pipeline](/documentation/dsls/dataframes/overview/#embedding-dataframes-in-a-pipeline).
+
+## Run the Java pipeline
+
+If you want to customize the environment or use transforms not available in the
+default Beam SDK, you might need to run your own expansion service. In such
+cases, [start the expansion service](#advanced-start-an-expansion-service)
+before running your pipeline.
+
+For the general use case, you can simply run your multi-language pipeline using

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #22263:
URL: https://github.com/apache/beam/pull/22263#discussion_r922572812


##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart

Review Comment:
   Sounds good. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
ihji commented on PR #22263:
URL: https://github.com/apache/beam/pull/22263#issuecomment-1184778302

   Probably worth mentioning that we also have higher-level [`DataframeTransform`](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransformTest.java#L51) so users don't need to manually specify Python DataframeTransform FQN.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pcoet commented on a diff in pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
pcoet commented on code in PR #22263:
URL: https://github.com/apache/beam/pull/22263#discussion_r922175066


##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+for Java, so you can use that instead of specifying the fully qualified name for
+the Python `DataframeTransform`.
+
+Here's the complete pipeline definition from the example:
+
+```java
+static void runWordCount(WordCountOptions options) {
+  Pipeline p = Pipeline.create(options);
+
+  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .setRowSchema(ExtractWordsFn.SCHEMA)
+      .apply(
+          PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
+                  "apache_beam.dataframe.transforms.DataframeTransform",
+                  options.getExpansionService())
+              .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+              .withKwarg("include_indexes", true))
+      .apply(MapElements.via(new FormatAsTextFn()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()));
+
+  p.run().waitUntilFinish();
 }
-{{< /highlight >}}
-
-The main differences with authoring a classic pipeline and transform are
-
-- The PTransform uses the [External](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) transform.
-- This has a Uniform Resource Name (URN) which will identify the transform in your expansion service (more below).
-- The address on which the expansion service is running.
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms) for a deeper understanding of using external transforms.
-
-### External transform
-
-The transform we are trying to call from Java is defined in Python as follows:
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-URN = "beam:transforms:xlang:pythontransform"
-
-@ptransform.PTransform.register_urn(URN, None)
-class PythonTransform(ptransform.PTransform):
-    def __init__(self):
-        super().__init__()
-
-    def expand(self, pcoll):
-        return (pcoll
-                | "Input preparation"
-                    >> beam.Map(
-                        lambda input: google.protobuf.text_format.Parse(input, tf.train.Example())
-                    )
-                | "Get predictions" >> RunInference(
-                        model_spec_pb2.InferenceSpecType(
-                            saved_model_spec=model_spec_pb2.SavedModelSpec(
-                                model_path=model_path,
-                                signature_name=['serving_default']))))
-
-    def to_runner_api_parameter(self, unused_context):
-        return URN, None
-
-    def from_runner_api_parameter(
-        unused_ptransform, unused_paramter, unused_context):
-        return PythonTransform()
-{{< /highlight >}}
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#create-x-lang-transforms) for a deeper understanding of creating an external transform.
-
-### Expansion service
-
-The expansion service is written in the same language as the external transform. It takes care of injecting the transforms in your pipeline before submitting them to the Runner.
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-def main(unused_argv):
-  parser = argparse.ArgumentParser()
-  parser.add_argument(
-      '-p', '--port', type=int, help='port on which to serve the job api')
-  options = parser.parse_args()
-  global server
-  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
-  beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
-      expansion_service.ExpansionServiceServicer(
-          PipelineOptions(
-              ["--experiments", "beam_fn_api", "--sdk_location", "container"])), server)
-  server.add_insecure_port('localhost:{}'.format(options.port))
-  server.start()
-  _LOGGER.info('Listening for expansion requests at %d', options.port)
-
-  signal.signal(signal.SIGTERM, cleanup)
-  signal.signal(signal.SIGINT, cleanup)
-  signal.pause()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  main(sys.argv)
-{{< /highlight >}}
-
-## How to run the cross-language pipeline?
-
-In this section, the steps to run a cross-language pipeline are set out:
-
-1. Start the **expansion service** with your Python transforms: `python expansion_service.py -p 9097`
-1. Start the **Job Server** which will translated into the stage that will run on your back-end or runner (e.g. Spark):
-
-   - From Apache Beam source code:
-     `./gradlew :runners:spark:job-server:runShadow`
-   - Using the pre-build Docker container:
-     `docker run -net=host apache/beam_spark_job_server`
-
-1. **Run pipeline**: ```mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \
-    -Pportable-runner \
-    -Dexec.args=" \
-        --runner=PortableRunner \
-        --jobEndpoint=localhost:8099 \
-        --useExternal=true \
-        --expansionServiceURL=localhost:9097 \
-        --experiments=beam_fn_api"```
+```
+
+`PythonExternalTransform` is a wrapper for invoking external Python functions.
+The
+[`from`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#from-java.lang.String-java.lang.String-)
+method accepts two strings: 1) the fully qualified transform name; 2) an
+optional address and port number for the expansion service. The method returns
+the transform.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pcoet commented on a diff in pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
pcoet commented on code in PR #22263:
URL: https://github.com/apache/beam/pull/22263#discussion_r922171046


##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+for Java, so you can use that instead of specifying the fully qualified name for
+the Python `DataframeTransform`.
+
+Here's the complete pipeline definition from the example:

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pcoet commented on a diff in pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
pcoet commented on code in PR #22263:
URL: https://github.com/apache/beam/pull/22263#discussion_r922171833


##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+for Java, so you can use that instead of specifying the fully qualified name for
+the Python `DataframeTransform`.
+
+Here's the complete pipeline definition from the example:
+
+```java
+static void runWordCount(WordCountOptions options) {
+  Pipeline p = Pipeline.create(options);
+
+  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .setRowSchema(ExtractWordsFn.SCHEMA)
+      .apply(
+          PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
+                  "apache_beam.dataframe.transforms.DataframeTransform",
+                  options.getExpansionService())
+              .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+              .withKwarg("include_indexes", true))
+      .apply(MapElements.via(new FormatAsTextFn()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()));
+
+  p.run().waitUntilFinish();
 }
-{{< /highlight >}}
-
-The main differences with authoring a classic pipeline and transform are
-
-- The PTransform uses the [External](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) transform.
-- This has a Uniform Resource Name (URN) which will identify the transform in your expansion service (more below).
-- The address on which the expansion service is running.
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms) for a deeper understanding of using external transforms.
-
-### External transform
-
-The transform we are trying to call from Java is defined in Python as follows:
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-URN = "beam:transforms:xlang:pythontransform"
-
-@ptransform.PTransform.register_urn(URN, None)
-class PythonTransform(ptransform.PTransform):
-    def __init__(self):
-        super().__init__()
-
-    def expand(self, pcoll):
-        return (pcoll
-                | "Input preparation"
-                    >> beam.Map(
-                        lambda input: google.protobuf.text_format.Parse(input, tf.train.Example())
-                    )
-                | "Get predictions" >> RunInference(
-                        model_spec_pb2.InferenceSpecType(
-                            saved_model_spec=model_spec_pb2.SavedModelSpec(
-                                model_path=model_path,
-                                signature_name=['serving_default']))))
-
-    def to_runner_api_parameter(self, unused_context):
-        return URN, None
-
-    def from_runner_api_parameter(
-        unused_ptransform, unused_paramter, unused_context):
-        return PythonTransform()
-{{< /highlight >}}
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#create-x-lang-transforms) for a deeper understanding of creating an external transform.
-
-### Expansion service
-
-The expansion service is written in the same language as the external transform. It takes care of injecting the transforms in your pipeline before submitting them to the Runner.
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-def main(unused_argv):
-  parser = argparse.ArgumentParser()
-  parser.add_argument(
-      '-p', '--port', type=int, help='port on which to serve the job api')
-  options = parser.parse_args()
-  global server
-  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
-  beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
-      expansion_service.ExpansionServiceServicer(
-          PipelineOptions(
-              ["--experiments", "beam_fn_api", "--sdk_location", "container"])), server)
-  server.add_insecure_port('localhost:{}'.format(options.port))
-  server.start()
-  _LOGGER.info('Listening for expansion requests at %d', options.port)
-
-  signal.signal(signal.SIGTERM, cleanup)
-  signal.signal(signal.SIGINT, cleanup)
-  signal.pause()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  main(sys.argv)
-{{< /highlight >}}
-
-## How to run the cross-language pipeline?
-
-In this section, the steps to run a cross-language pipeline are set out:
-
-1. Start the **expansion service** with your Python transforms: `python expansion_service.py -p 9097`
-1. Start the **Job Server** which will translated into the stage that will run on your back-end or runner (e.g. Spark):
-
-   - From Apache Beam source code:
-     `./gradlew :runners:spark:job-server:runShadow`
-   - Using the pre-build Docker container:
-     `docker run -net=host apache/beam_spark_job_server`
-
-1. **Run pipeline**: ```mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \
-    -Pportable-runner \
-    -Dexec.args=" \
-        --runner=PortableRunner \
-        --jobEndpoint=localhost:8099 \
-        --useExternal=true \
-        --expansionServiceURL=localhost:9097 \
-        --experiments=beam_fn_api"```
+```
+
+`PythonExternalTransform` is a wrapper for invoking external Python functions.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #22263:
URL: https://github.com/apache/beam/pull/22263#discussion_r921742185


##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+for Java, so you can use that instead of specifying the fully qualified name for
+the Python `DataframeTransform`.
+
+Here's the complete pipeline definition from the example:

Review Comment:
   Probably swap this and this and the above notice.



##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+for Java, so you can use that instead of specifying the fully qualified name for
+the Python `DataframeTransform`.
+
+Here's the complete pipeline definition from the example:
+
+```java
+static void runWordCount(WordCountOptions options) {
+  Pipeline p = Pipeline.create(options);
+
+  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .setRowSchema(ExtractWordsFn.SCHEMA)
+      .apply(
+          PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
+                  "apache_beam.dataframe.transforms.DataframeTransform",
+                  options.getExpansionService())
+              .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+              .withKwarg("include_indexes", true))
+      .apply(MapElements.via(new FormatAsTextFn()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()));
+
+  p.run().waitUntilFinish();
 }
-{{< /highlight >}}
-
-The main differences with authoring a classic pipeline and transform are
-
-- The PTransform uses the [External](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) transform.
-- This has a Uniform Resource Name (URN) which will identify the transform in your expansion service (more below).
-- The address on which the expansion service is running.
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms) for a deeper understanding of using external transforms.
-
-### External transform
-
-The transform we are trying to call from Java is defined in Python as follows:
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-URN = "beam:transforms:xlang:pythontransform"
-
-@ptransform.PTransform.register_urn(URN, None)
-class PythonTransform(ptransform.PTransform):
-    def __init__(self):
-        super().__init__()
-
-    def expand(self, pcoll):
-        return (pcoll
-                | "Input preparation"
-                    >> beam.Map(
-                        lambda input: google.protobuf.text_format.Parse(input, tf.train.Example())
-                    )
-                | "Get predictions" >> RunInference(
-                        model_spec_pb2.InferenceSpecType(
-                            saved_model_spec=model_spec_pb2.SavedModelSpec(
-                                model_path=model_path,
-                                signature_name=['serving_default']))))
-
-    def to_runner_api_parameter(self, unused_context):
-        return URN, None
-
-    def from_runner_api_parameter(
-        unused_ptransform, unused_paramter, unused_context):
-        return PythonTransform()
-{{< /highlight >}}
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#create-x-lang-transforms) for a deeper understanding of creating an external transform.
-
-### Expansion service
-
-The expansion service is written in the same language as the external transform. It takes care of injecting the transforms in your pipeline before submitting them to the Runner.
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-def main(unused_argv):
-  parser = argparse.ArgumentParser()
-  parser.add_argument(
-      '-p', '--port', type=int, help='port on which to serve the job api')
-  options = parser.parse_args()
-  global server
-  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
-  beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
-      expansion_service.ExpansionServiceServicer(
-          PipelineOptions(
-              ["--experiments", "beam_fn_api", "--sdk_location", "container"])), server)
-  server.add_insecure_port('localhost:{}'.format(options.port))
-  server.start()
-  _LOGGER.info('Listening for expansion requests at %d', options.port)
-
-  signal.signal(signal.SIGTERM, cleanup)
-  signal.signal(signal.SIGINT, cleanup)
-  signal.pause()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  main(sys.argv)
-{{< /highlight >}}
-
-## How to run the cross-language pipeline?
-
-In this section, the steps to run a cross-language pipeline are set out:
-
-1. Start the **expansion service** with your Python transforms: `python expansion_service.py -p 9097`
-1. Start the **Job Server** which will translated into the stage that will run on your back-end or runner (e.g. Spark):
-
-   - From Apache Beam source code:
-     `./gradlew :runners:spark:job-server:runShadow`
-   - Using the pre-build Docker container:
-     `docker run -net=host apache/beam_spark_job_server`
-
-1. **Run pipeline**: ```mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \
-    -Pportable-runner \
-    -Dexec.args=" \
-        --runner=PortableRunner \
-        --jobEndpoint=localhost:8099 \
-        --useExternal=true \
-        --expansionServiceURL=localhost:9097 \
-        --experiments=beam_fn_api"```
+```
+
+`PythonExternalTransform` is a wrapper for invoking external Python functions.
+The
+[`from`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#from-java.lang.String-java.lang.String-)
+method accepts two strings: 1) the fully qualified transform name; 2) an
+optional address and port number for the expansion service. The method returns
+the transform.

Review Comment:
   returns a stub transforms for the Python cross-language transform that can be directly used in a Java pipeline.



##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+for Java, so you can use that instead of specifying the fully qualified name for
+the Python `DataframeTransform`.
+
+Here's the complete pipeline definition from the example:
+
+```java
+static void runWordCount(WordCountOptions options) {
+  Pipeline p = Pipeline.create(options);
+
+  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .setRowSchema(ExtractWordsFn.SCHEMA)
+      .apply(
+          PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
+                  "apache_beam.dataframe.transforms.DataframeTransform",
+                  options.getExpansionService())
+              .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+              .withKwarg("include_indexes", true))
+      .apply(MapElements.via(new FormatAsTextFn()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()));
+
+  p.run().waitUntilFinish();
 }
-{{< /highlight >}}
-
-The main differences with authoring a classic pipeline and transform are
-
-- The PTransform uses the [External](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) transform.
-- This has a Uniform Resource Name (URN) which will identify the transform in your expansion service (more below).
-- The address on which the expansion service is running.
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms) for a deeper understanding of using external transforms.
-
-### External transform
-
-The transform we are trying to call from Java is defined in Python as follows:
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-URN = "beam:transforms:xlang:pythontransform"
-
-@ptransform.PTransform.register_urn(URN, None)
-class PythonTransform(ptransform.PTransform):
-    def __init__(self):
-        super().__init__()
-
-    def expand(self, pcoll):
-        return (pcoll
-                | "Input preparation"
-                    >> beam.Map(
-                        lambda input: google.protobuf.text_format.Parse(input, tf.train.Example())
-                    )
-                | "Get predictions" >> RunInference(
-                        model_spec_pb2.InferenceSpecType(
-                            saved_model_spec=model_spec_pb2.SavedModelSpec(
-                                model_path=model_path,
-                                signature_name=['serving_default']))))
-
-    def to_runner_api_parameter(self, unused_context):
-        return URN, None
-
-    def from_runner_api_parameter(
-        unused_ptransform, unused_paramter, unused_context):
-        return PythonTransform()
-{{< /highlight >}}
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#create-x-lang-transforms) for a deeper understanding of creating an external transform.
-
-### Expansion service
-
-The expansion service is written in the same language as the external transform. It takes care of injecting the transforms in your pipeline before submitting them to the Runner.
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-def main(unused_argv):
-  parser = argparse.ArgumentParser()
-  parser.add_argument(
-      '-p', '--port', type=int, help='port on which to serve the job api')
-  options = parser.parse_args()
-  global server
-  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
-  beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
-      expansion_service.ExpansionServiceServicer(
-          PipelineOptions(
-              ["--experiments", "beam_fn_api", "--sdk_location", "container"])), server)
-  server.add_insecure_port('localhost:{}'.format(options.port))
-  server.start()
-  _LOGGER.info('Listening for expansion requests at %d', options.port)
-
-  signal.signal(signal.SIGTERM, cleanup)
-  signal.signal(signal.SIGINT, cleanup)
-  signal.pause()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  main(sys.argv)
-{{< /highlight >}}
-
-## How to run the cross-language pipeline?
-
-In this section, the steps to run a cross-language pipeline are set out:
-
-1. Start the **expansion service** with your Python transforms: `python expansion_service.py -p 9097`
-1. Start the **Job Server** which will translated into the stage that will run on your back-end or runner (e.g. Spark):
-
-   - From Apache Beam source code:
-     `./gradlew :runners:spark:job-server:runShadow`
-   - Using the pre-build Docker container:
-     `docker run -net=host apache/beam_spark_job_server`
-
-1. **Run pipeline**: ```mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \
-    -Pportable-runner \
-    -Dexec.args=" \
-        --runner=PortableRunner \
-        --jobEndpoint=localhost:8099 \
-        --useExternal=true \
-        --expansionServiceURL=localhost:9097 \
-        --experiments=beam_fn_api"```
+```
+
+`PythonExternalTransform` is a wrapper for invoking external Python functions.
+The
+[`from`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#from-java.lang.String-java.lang.String-)
+method accepts two strings: 1) the fully qualified transform name; 2) an
+optional address and port number for the expansion service. The method returns
+the transform.
+[`withKwarg`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#withKwarg-java.lang.String-java.lang.Object-)
+specifies a keyword argument for instantiating the Python cross-language
+transform. In this case, `withKwarg` is invoked twice, to specify a `func`
+argument and an `include_indexes` argument, and these arguments are passed to
+`DataframeTransform`.

Review Comment:
   Add,
   Note that the PythonExternalTransform API offers additional mechanism for speficying args and kwargs for Python cross-language transforms.



##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+for Java, so you can use that instead of specifying the fully qualified name for
+the Python `DataframeTransform`.
+
+Here's the complete pipeline definition from the example:
+
+```java
+static void runWordCount(WordCountOptions options) {
+  Pipeline p = Pipeline.create(options);
+
+  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .setRowSchema(ExtractWordsFn.SCHEMA)
+      .apply(
+          PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
+                  "apache_beam.dataframe.transforms.DataframeTransform",
+                  options.getExpansionService())
+              .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+              .withKwarg("include_indexes", true))
+      .apply(MapElements.via(new FormatAsTextFn()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()));
+
+  p.run().waitUntilFinish();
 }
-{{< /highlight >}}
-
-The main differences with authoring a classic pipeline and transform are
-
-- The PTransform uses the [External](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) transform.
-- This has a Uniform Resource Name (URN) which will identify the transform in your expansion service (more below).
-- The address on which the expansion service is running.
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms) for a deeper understanding of using external transforms.
-
-### External transform
-
-The transform we are trying to call from Java is defined in Python as follows:
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-URN = "beam:transforms:xlang:pythontransform"
-
-@ptransform.PTransform.register_urn(URN, None)
-class PythonTransform(ptransform.PTransform):
-    def __init__(self):
-        super().__init__()
-
-    def expand(self, pcoll):
-        return (pcoll
-                | "Input preparation"
-                    >> beam.Map(
-                        lambda input: google.protobuf.text_format.Parse(input, tf.train.Example())
-                    )
-                | "Get predictions" >> RunInference(
-                        model_spec_pb2.InferenceSpecType(
-                            saved_model_spec=model_spec_pb2.SavedModelSpec(
-                                model_path=model_path,
-                                signature_name=['serving_default']))))
-
-    def to_runner_api_parameter(self, unused_context):
-        return URN, None
-
-    def from_runner_api_parameter(
-        unused_ptransform, unused_paramter, unused_context):
-        return PythonTransform()
-{{< /highlight >}}
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#create-x-lang-transforms) for a deeper understanding of creating an external transform.
-
-### Expansion service
-
-The expansion service is written in the same language as the external transform. It takes care of injecting the transforms in your pipeline before submitting them to the Runner.
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-def main(unused_argv):
-  parser = argparse.ArgumentParser()
-  parser.add_argument(
-      '-p', '--port', type=int, help='port on which to serve the job api')
-  options = parser.parse_args()
-  global server
-  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
-  beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
-      expansion_service.ExpansionServiceServicer(
-          PipelineOptions(
-              ["--experiments", "beam_fn_api", "--sdk_location", "container"])), server)
-  server.add_insecure_port('localhost:{}'.format(options.port))
-  server.start()
-  _LOGGER.info('Listening for expansion requests at %d', options.port)
-
-  signal.signal(signal.SIGTERM, cleanup)
-  signal.signal(signal.SIGINT, cleanup)
-  signal.pause()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  main(sys.argv)
-{{< /highlight >}}
-
-## How to run the cross-language pipeline?
-
-In this section, the steps to run a cross-language pipeline are set out:
-
-1. Start the **expansion service** with your Python transforms: `python expansion_service.py -p 9097`
-1. Start the **Job Server** which will translated into the stage that will run on your back-end or runner (e.g. Spark):
-
-   - From Apache Beam source code:
-     `./gradlew :runners:spark:job-server:runShadow`
-   - Using the pre-build Docker container:
-     `docker run -net=host apache/beam_spark_job_server`
-
-1. **Run pipeline**: ```mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \
-    -Pportable-runner \
-    -Dexec.args=" \
-        --runner=PortableRunner \
-        --jobEndpoint=localhost:8099 \
-        --useExternal=true \
-        --expansionServiceURL=localhost:9097 \
-        --experiments=beam_fn_api"```
+```
+
+`PythonExternalTransform` is a wrapper for invoking external Python functions.

Review Comment:
   s/functions/transforms



##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart

Review Comment:
   I think we should call this "Java multi-language quickstart for using Python transforms" and change the title/URL etc. accordingly.



##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+for Java, so you can use that instead of specifying the fully qualified name for
+the Python `DataframeTransform`.
+
+Here's the complete pipeline definition from the example:
+
+```java
+static void runWordCount(WordCountOptions options) {
+  Pipeline p = Pipeline.create(options);
+
+  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .setRowSchema(ExtractWordsFn.SCHEMA)
+      .apply(
+          PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
+                  "apache_beam.dataframe.transforms.DataframeTransform",
+                  options.getExpansionService())
+              .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+              .withKwarg("include_indexes", true))
+      .apply(MapElements.via(new FormatAsTextFn()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()));
+
+  p.run().waitUntilFinish();
 }
-{{< /highlight >}}
-
-The main differences with authoring a classic pipeline and transform are
-
-- The PTransform uses the [External](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) transform.
-- This has a Uniform Resource Name (URN) which will identify the transform in your expansion service (more below).
-- The address on which the expansion service is running.
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms) for a deeper understanding of using external transforms.
-
-### External transform
-
-The transform we are trying to call from Java is defined in Python as follows:
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-URN = "beam:transforms:xlang:pythontransform"
-
-@ptransform.PTransform.register_urn(URN, None)
-class PythonTransform(ptransform.PTransform):
-    def __init__(self):
-        super().__init__()
-
-    def expand(self, pcoll):
-        return (pcoll
-                | "Input preparation"
-                    >> beam.Map(
-                        lambda input: google.protobuf.text_format.Parse(input, tf.train.Example())
-                    )
-                | "Get predictions" >> RunInference(
-                        model_spec_pb2.InferenceSpecType(
-                            saved_model_spec=model_spec_pb2.SavedModelSpec(
-                                model_path=model_path,
-                                signature_name=['serving_default']))))
-
-    def to_runner_api_parameter(self, unused_context):
-        return URN, None
-
-    def from_runner_api_parameter(
-        unused_ptransform, unused_paramter, unused_context):
-        return PythonTransform()
-{{< /highlight >}}
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#create-x-lang-transforms) for a deeper understanding of creating an external transform.
-
-### Expansion service
-
-The expansion service is written in the same language as the external transform. It takes care of injecting the transforms in your pipeline before submitting them to the Runner.
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-def main(unused_argv):
-  parser = argparse.ArgumentParser()
-  parser.add_argument(
-      '-p', '--port', type=int, help='port on which to serve the job api')
-  options = parser.parse_args()
-  global server
-  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
-  beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
-      expansion_service.ExpansionServiceServicer(
-          PipelineOptions(
-              ["--experiments", "beam_fn_api", "--sdk_location", "container"])), server)
-  server.add_insecure_port('localhost:{}'.format(options.port))
-  server.start()
-  _LOGGER.info('Listening for expansion requests at %d', options.port)
-
-  signal.signal(signal.SIGTERM, cleanup)
-  signal.signal(signal.SIGINT, cleanup)
-  signal.pause()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  main(sys.argv)
-{{< /highlight >}}
-
-## How to run the cross-language pipeline?
-
-In this section, the steps to run a cross-language pipeline are set out:
-
-1. Start the **expansion service** with your Python transforms: `python expansion_service.py -p 9097`
-1. Start the **Job Server** which will translated into the stage that will run on your back-end or runner (e.g. Spark):
-
-   - From Apache Beam source code:
-     `./gradlew :runners:spark:job-server:runShadow`
-   - Using the pre-build Docker container:
-     `docker run -net=host apache/beam_spark_job_server`
-
-1. **Run pipeline**: ```mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \
-    -Pportable-runner \
-    -Dexec.args=" \
-        --runner=PortableRunner \
-        --jobEndpoint=localhost:8099 \
-        --useExternal=true \
-        --expansionServiceURL=localhost:9097 \
-        --experiments=beam_fn_api"```
+```
+
+`PythonExternalTransform` is a wrapper for invoking external Python functions.
+The
+[`from`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#from-java.lang.String-java.lang.String-)
+method accepts two strings: 1) the fully qualified transform name; 2) an
+optional address and port number for the expansion service. The method returns
+the transform.
+[`withKwarg`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#withKwarg-java.lang.String-java.lang.Object-)
+specifies a keyword argument for instantiating the Python cross-language
+transform. In this case, `withKwarg` is invoked twice, to specify a `func`
+argument and an `include_indexes` argument, and these arguments are passed to
+`DataframeTransform`.
+
+To understand how this pipeline works, it’s helpful to look more closely at the
+first `withKwarg` invocation:
+
+```java
+.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+```
+
+The argument to `PythonCallableSource.of` is a string representation of a Python
+lambda function. `DataframeTransform` takes as an argument a Python callable to
+apply to a `PCollection` as if it were a Dataframe. The `withKwarg` method lets
+you specify a Python callable in your Java pipeline. To learn more about passing
+a function to `DataframeTransform`, see
+[Embedding DataFrames in a pipeline](/documentation/dsls/dataframes/overview/#embedding-dataframes-in-a-pipeline).
+
+## Run the Java pipeline
+
+If you want to customize the environment or use transforms not available in the
+default Beam SDK, you might need to run your own expansion service. In such
+cases, [start the expansion service](#advanced-start-an-expansion-service)
+before running your pipeline.
+
+For the general use case, you can simply run your multi-language pipeline using

Review Comment:
   For running this example, you can ...



##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+for Java, so you can use that instead of specifying the fully qualified name for
+the Python `DataframeTransform`.
+
+Here's the complete pipeline definition from the example:
+
+```java
+static void runWordCount(WordCountOptions options) {
+  Pipeline p = Pipeline.create(options);
+
+  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .setRowSchema(ExtractWordsFn.SCHEMA)
+      .apply(
+          PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
+                  "apache_beam.dataframe.transforms.DataframeTransform",
+                  options.getExpansionService())
+              .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+              .withKwarg("include_indexes", true))
+      .apply(MapElements.via(new FormatAsTextFn()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()));
+
+  p.run().waitUntilFinish();
 }
-{{< /highlight >}}
-
-The main differences with authoring a classic pipeline and transform are
-
-- The PTransform uses the [External](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) transform.
-- This has a Uniform Resource Name (URN) which will identify the transform in your expansion service (more below).
-- The address on which the expansion service is running.
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms) for a deeper understanding of using external transforms.
-
-### External transform
-
-The transform we are trying to call from Java is defined in Python as follows:
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-URN = "beam:transforms:xlang:pythontransform"
-
-@ptransform.PTransform.register_urn(URN, None)
-class PythonTransform(ptransform.PTransform):
-    def __init__(self):
-        super().__init__()
-
-    def expand(self, pcoll):
-        return (pcoll
-                | "Input preparation"
-                    >> beam.Map(
-                        lambda input: google.protobuf.text_format.Parse(input, tf.train.Example())
-                    )
-                | "Get predictions" >> RunInference(
-                        model_spec_pb2.InferenceSpecType(
-                            saved_model_spec=model_spec_pb2.SavedModelSpec(
-                                model_path=model_path,
-                                signature_name=['serving_default']))))
-
-    def to_runner_api_parameter(self, unused_context):
-        return URN, None
-
-    def from_runner_api_parameter(
-        unused_ptransform, unused_paramter, unused_context):
-        return PythonTransform()
-{{< /highlight >}}
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#create-x-lang-transforms) for a deeper understanding of creating an external transform.
-
-### Expansion service
-
-The expansion service is written in the same language as the external transform. It takes care of injecting the transforms in your pipeline before submitting them to the Runner.
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-def main(unused_argv):
-  parser = argparse.ArgumentParser()
-  parser.add_argument(
-      '-p', '--port', type=int, help='port on which to serve the job api')
-  options = parser.parse_args()
-  global server
-  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
-  beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
-      expansion_service.ExpansionServiceServicer(
-          PipelineOptions(
-              ["--experiments", "beam_fn_api", "--sdk_location", "container"])), server)
-  server.add_insecure_port('localhost:{}'.format(options.port))
-  server.start()
-  _LOGGER.info('Listening for expansion requests at %d', options.port)
-
-  signal.signal(signal.SIGTERM, cleanup)
-  signal.signal(signal.SIGINT, cleanup)
-  signal.pause()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  main(sys.argv)
-{{< /highlight >}}
-
-## How to run the cross-language pipeline?
-
-In this section, the steps to run a cross-language pipeline are set out:
-
-1. Start the **expansion service** with your Python transforms: `python expansion_service.py -p 9097`
-1. Start the **Job Server** which will translated into the stage that will run on your back-end or runner (e.g. Spark):
-
-   - From Apache Beam source code:
-     `./gradlew :runners:spark:job-server:runShadow`
-   - Using the pre-build Docker container:
-     `docker run -net=host apache/beam_spark_job_server`
-
-1. **Run pipeline**: ```mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \
-    -Pportable-runner \
-    -Dexec.args=" \
-        --runner=PortableRunner \
-        --jobEndpoint=localhost:8099 \
-        --useExternal=true \
-        --expansionServiceURL=localhost:9097 \
-        --experiments=beam_fn_api"```
+```
+
+`PythonExternalTransform` is a wrapper for invoking external Python functions.
+The
+[`from`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#from-java.lang.String-java.lang.String-)
+method accepts two strings: 1) the fully qualified transform name; 2) an
+optional address and port number for the expansion service. The method returns
+the transform.
+[`withKwarg`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#withKwarg-java.lang.String-java.lang.Object-)
+specifies a keyword argument for instantiating the Python cross-language
+transform. In this case, `withKwarg` is invoked twice, to specify a `func`
+argument and an `include_indexes` argument, and these arguments are passed to
+`DataframeTransform`.
+
+To understand how this pipeline works, it’s helpful to look more closely at the
+first `withKwarg` invocation:
+
+```java
+.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+```
+
+The argument to `PythonCallableSource.of` is a string representation of a Python
+lambda function. `DataframeTransform` takes as an argument a Python callable to
+apply to a `PCollection` as if it were a Dataframe. The `withKwarg` method lets
+you specify a Python callable in your Java pipeline. To learn more about passing
+a function to `DataframeTransform`, see
+[Embedding DataFrames in a pipeline](/documentation/dsls/dataframes/overview/#embedding-dataframes-in-a-pipeline).
+
+## Run the Java pipeline
+
+If you want to customize the environment or use transforms not available in the
+default Beam SDK, you might need to run your own expansion service. In such
+cases, [start the expansion service](#advanced-start-an-expansion-service)
+before running your pipeline.
+
+For the general use case, you can simply run your multi-language pipeline using
+Gradle, as shown below.
+
+### Run with Dataflow runner
+
+The following script runs the example multi-language pipeline on Dataflow, using
+example text from a Cloud Storage bucket. You’ll need to adapt the script to
+your environment.
+
+```
+export OUTPUT_BUCKET=<bucket>
+export GCP_REGION=<region>
+export TEMP_LOCATION=gs://$OUTPUT_BUCKET/tmp
+export PYTHON_VERSION=<version>
+
+./gradlew :examples:multi-language:pythonDataframeWordCount --args=" \
+--runner=DataflowRunner \
+--output=gs://${OUTPUT_BUCKET}/count \
+--region=${GCP_REGION} \
+--experiments=use_runner_v2"
+```
+
+The pipeline outputs a file with the results to
+**gs://$OUTPUT_BUCKET/count-00000-of-00001**.
+
+Note: For Beam 2.40.0, you also need to set `sdkHarnessContainerImageOverrides`.

Review Comment:
   I don't think we should necessarily document this. If we do we should provide an example (how to push a container and use that).



##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)

Review Comment:
   Probably we should be more clear here to use DataframeTransform for production use cases instead of this example.
   
   How about something like following.
   
   This example pipeline was developed to demonstrate the development of Java multi-language pipelines that use arbitrary Python cross-language transforms. For any production use-cases that needs to use the Dataframe API in Java, you should be using the higher level [DataframeTransform](<ling>) transform instead".
   



##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+for Java, so you can use that instead of specifying the fully qualified name for
+the Python `DataframeTransform`.
+
+Here's the complete pipeline definition from the example:
+
+```java
+static void runWordCount(WordCountOptions options) {
+  Pipeline p = Pipeline.create(options);
+
+  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .setRowSchema(ExtractWordsFn.SCHEMA)
+      .apply(
+          PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
+                  "apache_beam.dataframe.transforms.DataframeTransform",
+                  options.getExpansionService())
+              .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+              .withKwarg("include_indexes", true))
+      .apply(MapElements.via(new FormatAsTextFn()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()));
+
+  p.run().waitUntilFinish();
 }
-{{< /highlight >}}
-
-The main differences with authoring a classic pipeline and transform are
-
-- The PTransform uses the [External](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) transform.
-- This has a Uniform Resource Name (URN) which will identify the transform in your expansion service (more below).
-- The address on which the expansion service is running.
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms) for a deeper understanding of using external transforms.
-
-### External transform
-
-The transform we are trying to call from Java is defined in Python as follows:
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-URN = "beam:transforms:xlang:pythontransform"
-
-@ptransform.PTransform.register_urn(URN, None)
-class PythonTransform(ptransform.PTransform):
-    def __init__(self):
-        super().__init__()
-
-    def expand(self, pcoll):
-        return (pcoll
-                | "Input preparation"
-                    >> beam.Map(
-                        lambda input: google.protobuf.text_format.Parse(input, tf.train.Example())
-                    )
-                | "Get predictions" >> RunInference(
-                        model_spec_pb2.InferenceSpecType(
-                            saved_model_spec=model_spec_pb2.SavedModelSpec(
-                                model_path=model_path,
-                                signature_name=['serving_default']))))
-
-    def to_runner_api_parameter(self, unused_context):
-        return URN, None
-
-    def from_runner_api_parameter(
-        unused_ptransform, unused_paramter, unused_context):
-        return PythonTransform()
-{{< /highlight >}}
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#create-x-lang-transforms) for a deeper understanding of creating an external transform.
-
-### Expansion service
-
-The expansion service is written in the same language as the external transform. It takes care of injecting the transforms in your pipeline before submitting them to the Runner.
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-def main(unused_argv):
-  parser = argparse.ArgumentParser()
-  parser.add_argument(
-      '-p', '--port', type=int, help='port on which to serve the job api')
-  options = parser.parse_args()
-  global server
-  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
-  beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
-      expansion_service.ExpansionServiceServicer(
-          PipelineOptions(
-              ["--experiments", "beam_fn_api", "--sdk_location", "container"])), server)
-  server.add_insecure_port('localhost:{}'.format(options.port))
-  server.start()
-  _LOGGER.info('Listening for expansion requests at %d', options.port)
-
-  signal.signal(signal.SIGTERM, cleanup)
-  signal.signal(signal.SIGINT, cleanup)
-  signal.pause()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  main(sys.argv)
-{{< /highlight >}}
-
-## How to run the cross-language pipeline?
-
-In this section, the steps to run a cross-language pipeline are set out:
-
-1. Start the **expansion service** with your Python transforms: `python expansion_service.py -p 9097`
-1. Start the **Job Server** which will translated into the stage that will run on your back-end or runner (e.g. Spark):
-
-   - From Apache Beam source code:
-     `./gradlew :runners:spark:job-server:runShadow`
-   - Using the pre-build Docker container:
-     `docker run -net=host apache/beam_spark_job_server`
-
-1. **Run pipeline**: ```mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \
-    -Pportable-runner \
-    -Dexec.args=" \
-        --runner=PortableRunner \
-        --jobEndpoint=localhost:8099 \
-        --useExternal=true \
-        --expansionServiceURL=localhost:9097 \
-        --experiments=beam_fn_api"```
+```
+
+`PythonExternalTransform` is a wrapper for invoking external Python functions.
+The
+[`from`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#from-java.lang.String-java.lang.String-)
+method accepts two strings: 1) the fully qualified transform name; 2) an
+optional address and port number for the expansion service. The method returns
+the transform.
+[`withKwarg`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#withKwarg-java.lang.String-java.lang.Object-)
+specifies a keyword argument for instantiating the Python cross-language
+transform. In this case, `withKwarg` is invoked twice, to specify a `func`
+argument and an `include_indexes` argument, and these arguments are passed to
+`DataframeTransform`.
+
+To understand how this pipeline works, it’s helpful to look more closely at the
+first `withKwarg` invocation:
+
+```java
+.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+```
+
+The argument to `PythonCallableSource.of` is a string representation of a Python
+lambda function. `DataframeTransform` takes as an argument a Python callable to
+apply to a `PCollection` as if it were a Dataframe. The `withKwarg` method lets
+you specify a Python callable in your Java pipeline. To learn more about passing
+a function to `DataframeTransform`, see
+[Embedding DataFrames in a pipeline](/documentation/dsls/dataframes/overview/#embedding-dataframes-in-a-pipeline).
+
+## Run the Java pipeline
+
+If you want to customize the environment or use transforms not available in the
+default Beam SDK, you might need to run your own expansion service. In such
+cases, [start the expansion service](#advanced-start-an-expansion-service)
+before running your pipeline.
+
+For the general use case, you can simply run your multi-language pipeline using
+Gradle, as shown below.
+
+### Run with Dataflow runner
+
+The following script runs the example multi-language pipeline on Dataflow, using
+example text from a Cloud Storage bucket. You’ll need to adapt the script to
+your environment.
+
+```
+export OUTPUT_BUCKET=<bucket>
+export GCP_REGION=<region>
+export TEMP_LOCATION=gs://$OUTPUT_BUCKET/tmp
+export PYTHON_VERSION=<version>
+
+./gradlew :examples:multi-language:pythonDataframeWordCount --args=" \
+--runner=DataflowRunner \
+--output=gs://${OUTPUT_BUCKET}/count \
+--region=${GCP_REGION} \
+--experiments=use_runner_v2"
+```
+
+The pipeline outputs a file with the results to
+**gs://$OUTPUT_BUCKET/count-00000-of-00001**.
+
+Note: For Beam 2.40.0, you also need to set `sdkHarnessContainerImageOverrides`.
+
+## Advanced: Start an expansion service
+
+When building a job for a multi-language pipeline, Beam uses an
+[expansion service](/documentation/glossary/#expansion-service) to expand
+composite transforms. You must have at least one expansion service per remote
+SDK.
+
+In the general case, if you have a supported version of Python installed on your
+system, you can let `PythonExternalTransform` handle the details of creating and
+starting up the expansion service. But if you want to customize the environment
+or use transforms not available in the default Beam SDK, you might need to run
+your own expansion service.
+
+For example, to start the standard expansion service for a Python transform,
+[ExpansionServiceServicer](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/expansion_service.py),
+follow these steps:
+
+1. Activate a Python virtual environment and install Apache Beam, as described
+   in the [Python quick start](/get-started/quickstart-py/).
+2. In the **beam/sdks/python** directory of the Beam source code, run the
+   following command:
+
+   ```
+   python apache_beam/runners/portability/expansion_service_main.py -p 18089 --fully_qualified_name_glob "*"
+   ```
+
+The command runs
+[expansion_service_main.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/expansion_service_main.py), which starts the standard expansion service. You should see output similar to
+the following:
+
+```
+INFO:root:Default Python SDK image for environment is <IMAGE>

Review Comment:
   These logs may change in the future and may not be consistent with what we document here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj merged pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
chamikaramj merged PR #22263:
URL: https://github.com/apache/beam/pull/22263


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pcoet commented on a diff in pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
pcoet commented on code in PR #22263:
URL: https://github.com/apache/beam/pull/22263#discussion_r922187587


##########
website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md:
##########
@@ -19,162 +19,185 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+There's also a higher-level cross-language [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+for Java, so you can use that instead of specifying the fully qualified name for
+the Python `DataframeTransform`.
+
+Here's the complete pipeline definition from the example:
+
+```java
+static void runWordCount(WordCountOptions options) {
+  Pipeline p = Pipeline.create(options);
+
+  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .setRowSchema(ExtractWordsFn.SCHEMA)
+      .apply(
+          PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
+                  "apache_beam.dataframe.transforms.DataframeTransform",
+                  options.getExpansionService())
+              .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+              .withKwarg("include_indexes", true))
+      .apply(MapElements.via(new FormatAsTextFn()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()));
+
+  p.run().waitUntilFinish();
 }
-{{< /highlight >}}
-
-The main differences with authoring a classic pipeline and transform are
-
-- The PTransform uses the [External](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) transform.
-- This has a Uniform Resource Name (URN) which will identify the transform in your expansion service (more below).
-- The address on which the expansion service is running.
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms) for a deeper understanding of using external transforms.
-
-### External transform
-
-The transform we are trying to call from Java is defined in Python as follows:
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-URN = "beam:transforms:xlang:pythontransform"
-
-@ptransform.PTransform.register_urn(URN, None)
-class PythonTransform(ptransform.PTransform):
-    def __init__(self):
-        super().__init__()
-
-    def expand(self, pcoll):
-        return (pcoll
-                | "Input preparation"
-                    >> beam.Map(
-                        lambda input: google.protobuf.text_format.Parse(input, tf.train.Example())
-                    )
-                | "Get predictions" >> RunInference(
-                        model_spec_pb2.InferenceSpecType(
-                            saved_model_spec=model_spec_pb2.SavedModelSpec(
-                                model_path=model_path,
-                                signature_name=['serving_default']))))
-
-    def to_runner_api_parameter(self, unused_context):
-        return URN, None
-
-    def from_runner_api_parameter(
-        unused_ptransform, unused_paramter, unused_context):
-        return PythonTransform()
-{{< /highlight >}}
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#create-x-lang-transforms) for a deeper understanding of creating an external transform.
-
-### Expansion service
-
-The expansion service is written in the same language as the external transform. It takes care of injecting the transforms in your pipeline before submitting them to the Runner.
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-def main(unused_argv):
-  parser = argparse.ArgumentParser()
-  parser.add_argument(
-      '-p', '--port', type=int, help='port on which to serve the job api')
-  options = parser.parse_args()
-  global server
-  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
-  beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
-      expansion_service.ExpansionServiceServicer(
-          PipelineOptions(
-              ["--experiments", "beam_fn_api", "--sdk_location", "container"])), server)
-  server.add_insecure_port('localhost:{}'.format(options.port))
-  server.start()
-  _LOGGER.info('Listening for expansion requests at %d', options.port)
-
-  signal.signal(signal.SIGTERM, cleanup)
-  signal.signal(signal.SIGINT, cleanup)
-  signal.pause()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  main(sys.argv)
-{{< /highlight >}}
-
-## How to run the cross-language pipeline?
-
-In this section, the steps to run a cross-language pipeline are set out:
-
-1. Start the **expansion service** with your Python transforms: `python expansion_service.py -p 9097`
-1. Start the **Job Server** which will translated into the stage that will run on your back-end or runner (e.g. Spark):
-
-   - From Apache Beam source code:
-     `./gradlew :runners:spark:job-server:runShadow`
-   - Using the pre-build Docker container:
-     `docker run -net=host apache/beam_spark_job_server`
-
-1. **Run pipeline**: ```mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \
-    -Pportable-runner \
-    -Dexec.args=" \
-        --runner=PortableRunner \
-        --jobEndpoint=localhost:8099 \
-        --useExternal=true \
-        --expansionServiceURL=localhost:9097 \
-        --experiments=beam_fn_api"```
+```
+
+`PythonExternalTransform` is a wrapper for invoking external Python functions.
+The
+[`from`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#from-java.lang.String-java.lang.String-)
+method accepts two strings: 1) the fully qualified transform name; 2) an
+optional address and port number for the expansion service. The method returns
+the transform.
+[`withKwarg`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#withKwarg-java.lang.String-java.lang.Object-)
+specifies a keyword argument for instantiating the Python cross-language
+transform. In this case, `withKwarg` is invoked twice, to specify a `func`
+argument and an `include_indexes` argument, and these arguments are passed to
+`DataframeTransform`.
+
+To understand how this pipeline works, it’s helpful to look more closely at the
+first `withKwarg` invocation:
+
+```java
+.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+```
+
+The argument to `PythonCallableSource.of` is a string representation of a Python
+lambda function. `DataframeTransform` takes as an argument a Python callable to
+apply to a `PCollection` as if it were a Dataframe. The `withKwarg` method lets
+you specify a Python callable in your Java pipeline. To learn more about passing
+a function to `DataframeTransform`, see
+[Embedding DataFrames in a pipeline](/documentation/dsls/dataframes/overview/#embedding-dataframes-in-a-pipeline).
+
+## Run the Java pipeline
+
+If you want to customize the environment or use transforms not available in the
+default Beam SDK, you might need to run your own expansion service. In such
+cases, [start the expansion service](#advanced-start-an-expansion-service)
+before running your pipeline.
+
+For the general use case, you can simply run your multi-language pipeline using
+Gradle, as shown below.
+
+### Run with Dataflow runner
+
+The following script runs the example multi-language pipeline on Dataflow, using
+example text from a Cloud Storage bucket. You’ll need to adapt the script to
+your environment.
+
+```
+export OUTPUT_BUCKET=<bucket>
+export GCP_REGION=<region>
+export TEMP_LOCATION=gs://$OUTPUT_BUCKET/tmp
+export PYTHON_VERSION=<version>
+
+./gradlew :examples:multi-language:pythonDataframeWordCount --args=" \
+--runner=DataflowRunner \
+--output=gs://${OUTPUT_BUCKET}/count \
+--region=${GCP_REGION} \
+--experiments=use_runner_v2"
+```
+
+The pipeline outputs a file with the results to
+**gs://$OUTPUT_BUCKET/count-00000-of-00001**.
+
+Note: For Beam 2.40.0, you also need to set `sdkHarnessContainerImageOverrides`.

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pcoet commented on pull request #22263: Rewrote Java multi-language pipeline quickstart

Posted by GitBox <gi...@apache.org>.
pcoet commented on PR #22263:
URL: https://github.com/apache/beam/pull/22263#issuecomment-1185574392

   Thanks, Cham! I made all the changes accept for the title/URL. I think we should leave that as is (see my comment). But let me know...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org