You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/07/15 23:19:10 UTC

[beam] branch master updated: Rewrote Java multi-language pipeline quickstart (#22263)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4da1368a5bb Rewrote Java multi-language pipeline quickstart (#22263)
4da1368a5bb is described below

commit 4da1368a5bb4c96165a81c70e076e190df269c87
Author: David Huntsperger <56...@users.noreply.github.com>
AuthorDate: Fri Jul 15 16:19:05 2022 -0700

    Rewrote Java multi-language pipeline quickstart (#22263)
    
    * rewrote Java multi-language pipeline quickstart
    
    * reference higher-level DataframeTransform
    
    * fix one last awkward phrase
    
    * incorporated feedback
---
 .../sdks/java-multi-language-pipelines.md          | 331 +++++++++++----------
 1 file changed, 173 insertions(+), 158 deletions(-)

diff --git a/website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md b/website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md
index 33bd11a4787..93243bee9ce 100644
--- a/website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md
+++ b/website/www/site/content/en/documentation/sdks/java-multi-language-pipelines.md
@@ -19,162 +19,177 @@ limitations under the License.
 
 # Java multi-language pipelines quickstart
 
-> **Note:** This page is a work in progress. Please see [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) for full documentation.
-
-This page demonstrates how to write a Java pipeline that uses a Python cross-language transform.
-
-The goal of a cross-language pipeline is to incorporate transforms from one SDK (e.g. the Python SDK) into a pipeline written using another SDK (e.g. the Java SDK). This enables having already developed transforms (e.g. ML transforms in Python) and libraries (e.g. the vast library of IOs in Java), and strengths of certain languages at your disposal in whichever language you are more comfortable authoring pipelines while vastly expanding your toolkit in given language.
-
-In this section we will cover a specific use-case: incorporating a Python transform that does inference on a model but is part of a larger Java pipeline. The section is broken down into 2 parts:
-
-1. How to author the cross-language pipeline?
-1. How to run the cross-language pipeline?
-
-{{< language-switcher java py >}}
-
-## How to author the cross-language pipeline?
-
-This section digs into what changes when authoring a cross-language pipeline:
-
-1. "Classic" pipeline in Java
-1. External transform in Python
-1. Expansion server
-
-### "Classic" pipeline
-
-We start by developing an Apache Beam pipeline like we would normally do if you were using only one SDK (e.g. the Java SDK):
-
-{{< highlight java >}}
-public class CrossLanguageTransform extends PTransform<PCollection<String>, PCollection<String>> {
-    private static final String URN = "beam:transforms:xlang:pythontransform";
-
-    private static String expansionAddress;
-
-    public CrossLanguageTransform(String expansionAddress) {
-        this.expansionAddress = expansionAddress;
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-        PCollection<String> output =
-            input.apply(
-                "ExternalPythonTransform",
-                External.of(URN, new byte [] {}, this.expansionAddress)
-            );
-    }
-}
-
-public class CrossLanguagePipeline {
-    public static void main(String[] args) {
-        Pipeline p = Pipeline.create();
-
-        String expansionAddress = "localhost:9097"
-
-        PCollection<String> inputs = p.apply(Create.of("features { feature { key: 'country' value { bytes_list { value: 'Belgium' }}}}"));
-        input.apply(new CrossLanguageTransform(expansionAddress));
-
-        p.run().waitUntilFinish();
-    }
+This page provides a high-level overview of creating multi-language pipelines
+with the Apache Beam SDK for Java. For a more complete discussion of the topic,
+see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language
+and uses one or more transforms from another Beam SDK language. These transforms
+from another SDK are called *cross-language transforms*. Multi-language support
+makes pipeline components easier to share across the Beam SDKs and grows the
+pool of available transforms for all the SDKs.
+
+In the examples below, the multi-language pipeline is built with the Beam Java
+SDK, and the cross-language transform is built with the Beam Python SDK.
+
+## Prerequisites
+
+This quickstart is based on a Java example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+that counts words in a Shakespeare text. If you’d like to run the pipeline, you
+can clone or download the Beam repository and build the example from the source
+code.
+
+To build and run the example, you need a Java environment with the Beam Java SDK
+version 2.40.0 or later installed, and a Python environment. If you don’t
+already have these environments set up, first complete the
+[Apache Beam Java SDK Quickstart](/get-started/quickstart-java/) and the
+[Apache Beam Python SDK Quickstart](/get-started/quickstart-py/).
+
+## Specify a cross-language transform
+
+The Java example pipeline uses the Python
+[DataframeTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/dataframe/transforms.py)
+as a cross-language transform. The transform is part of the
+[Beam Dataframe API](/documentation/dsls/dataframes/overview/) for working with
+pandas-like
+[DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+objects.
+
+To apply a cross-language transform, your pipeline must specify it. Python
+transforms are identified by their fully qualified name. For example,
+`DataframeTransform` can be found in the `apache_beam.dataframe.transforms`
+package, so its fully qualified name is
+`apache_beam.dataframe.transforms.DataframeTransform`.
+The example pipeline,
+[PythonDataframeWordCount](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java),
+passes this fully qualified name to
+[PythonExternalTransform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html).
+
+> **Note:** The example pipeline is intended to demonstrate the development of
+> Java multi-language pipelines that use arbitrary Python cross-language
+> transforms. For production use cases of the Dataframe API in Java, you should
+> use the higher-level
+> [DataframeTransform](https://github.com/apache/beam/blob/master/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/transforms/DataframeTransform.java)
+> instead.
+
+Here's the complete pipeline definition from the example:
+
+```java
+static void runWordCount(WordCountOptions options) {
+  Pipeline p = Pipeline.create(options);
+
+  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
+      .apply(ParDo.of(new ExtractWordsFn()))
+      .setRowSchema(ExtractWordsFn.SCHEMA)
+      .apply(
+          PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
+                  "apache_beam.dataframe.transforms.DataframeTransform",
+                  options.getExpansionService())
+              .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+              .withKwarg("include_indexes", true))
+      .apply(MapElements.via(new FormatAsTextFn()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()));
+
+  p.run().waitUntilFinish();
 }
-{{< /highlight >}}
-
-The main differences with authoring a classic pipeline and transform are
-
-- The PTransform uses the [External](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) transform.
-- This has a Uniform Resource Name (URN) which will identify the transform in your expansion service (more below).
-- The address on which the expansion service is running.
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#use-x-lang-transforms) for a deeper understanding of using external transforms.
-
-### External transform
-
-The transform we are trying to call from Java is defined in Python as follows:
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-URN = "beam:transforms:xlang:pythontransform"
-
-@ptransform.PTransform.register_urn(URN, None)
-class PythonTransform(ptransform.PTransform):
-    def __init__(self):
-        super().__init__()
-
-    def expand(self, pcoll):
-        return (pcoll
-                | "Input preparation"
-                    >> beam.Map(
-                        lambda input: google.protobuf.text_format.Parse(input, tf.train.Example())
-                    )
-                | "Get predictions" >> RunInference(
-                        model_spec_pb2.InferenceSpecType(
-                            saved_model_spec=model_spec_pb2.SavedModelSpec(
-                                model_path=model_path,
-                                signature_name=['serving_default']))))
-
-    def to_runner_api_parameter(self, unused_context):
-        return URN, None
-
-    def from_runner_api_parameter(
-        unused_ptransform, unused_paramter, unused_context):
-        return PythonTransform()
-{{< /highlight >}}
-
-Check the [documentation](https://beam.apache.org/documentation/programming-guide/#create-x-lang-transforms) for a deeper understanding of creating an external transform.
-
-### Expansion service
-
-The expansion service is written in the same language as the external transform. It takes care of injecting the transforms in your pipeline before submitting them to the Runner.
-
-{{< highlight java >}}
-Implemented in Python.
-{{< /highlight >}}
-
-{{< highlight py >}}
-def main(unused_argv):
-  parser = argparse.ArgumentParser()
-  parser.add_argument(
-      '-p', '--port', type=int, help='port on which to serve the job api')
-  options = parser.parse_args()
-  global server
-  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
-  beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
-      expansion_service.ExpansionServiceServicer(
-          PipelineOptions(
-              ["--experiments", "beam_fn_api", "--sdk_location", "container"])), server)
-  server.add_insecure_port('localhost:{}'.format(options.port))
-  server.start()
-  _LOGGER.info('Listening for expansion requests at %d', options.port)
-
-  signal.signal(signal.SIGTERM, cleanup)
-  signal.signal(signal.SIGINT, cleanup)
-  signal.pause()
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  main(sys.argv)
-{{< /highlight >}}
-
-## How to run the cross-language pipeline?
-
-In this section, the steps to run a cross-language pipeline are set out:
-
-1. Start the **expansion service** with your Python transforms: `python expansion_service.py -p 9097`
-1. Start the **Job Server** which will translated into the stage that will run on your back-end or runner (e.g. Spark):
-
-   - From Apache Beam source code:
-     `./gradlew :runners:spark:job-server:runShadow`
-   - Using the pre-build Docker container:
-     `docker run -net=host apache/beam_spark_job_server`
-
-1. **Run pipeline**: ```mvn exec:java -Dexec.mainClass=CrossLanguagePipeline \
-    -Pportable-runner \
-    -Dexec.args=" \
-        --runner=PortableRunner \
-        --jobEndpoint=localhost:8099 \
-        --useExternal=true \
-        --expansionServiceURL=localhost:9097 \
-        --experiments=beam_fn_api"```
+```
+
+`PythonExternalTransform` is a wrapper for invoking external Python transforms.
+The
+[`from`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#from-java.lang.String-java.lang.String-)
+method accepts two strings: 1) the fully qualified transform name; 2) an
+optional address and port number for the expansion service. The method returns
+a stub for the Python cross-language transform that can be used directly in a
+Java pipeline.
+[`withKwarg`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/extensions/python/PythonExternalTransform.html#withKwarg-java.lang.String-java.lang.Object-)
+specifies a keyword argument for instantiating the Python cross-language
+transform. In this case, `withKwarg` is invoked twice, to specify a `func`
+argument and an `include_indexes` argument, and these arguments are passed to
+`DataframeTransform`. `PythonExternalTransform` also provides other ways to
+specify args and kwargs for Python cross-language transforms.
+
+To understand how this pipeline works, it’s helpful to look more closely at the
+first `withKwarg` invocation:
+
+```java
+.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
+```
+
+The argument to `PythonCallableSource.of` is a string representation of a Python
+lambda function. `DataframeTransform` takes as an argument a Python callable to
+apply to a `PCollection` as if it were a Dataframe. The `withKwarg` method lets
+you specify a Python callable in your Java pipeline. To learn more about passing
+a function to `DataframeTransform`, see
+[Embedding DataFrames in a pipeline](/documentation/dsls/dataframes/overview/#embedding-dataframes-in-a-pipeline).
+
+## Run the Java pipeline
+
+If you want to customize the environment or use transforms not available in the
+default Beam SDK, you might need to run your own expansion service. In such
+cases, [start the expansion service](#advanced-start-an-expansion-service)
+before running your pipeline.
+
+For this example, you can simply run your multi-language pipeline using
+Gradle, as shown below.
+
+### Run with Dataflow runner
+
+The following script runs the example multi-language pipeline on Dataflow, using
+example text from a Cloud Storage bucket. You’ll need to adapt the script to
+your environment.
+
+```
+export OUTPUT_BUCKET=<bucket>
+export GCP_REGION=<region>
+export TEMP_LOCATION=gs://$OUTPUT_BUCKET/tmp
+export PYTHON_VERSION=<version>
+
+./gradlew :examples:multi-language:pythonDataframeWordCount --args=" \
+--runner=DataflowRunner \
+--output=gs://${OUTPUT_BUCKET}/count \
+--region=${GCP_REGION} \
+--experiments=use_runner_v2"
+```
+
+The pipeline outputs a file with the results to
+**gs://$OUTPUT_BUCKET/count-00000-of-00001**.
+
+## Advanced: Start an expansion service
+
+When building a job for a multi-language pipeline, Beam uses an
+[expansion service](/documentation/glossary/#expansion-service) to expand
+composite transforms. You must have at least one expansion service per remote
+SDK.
+
+In the general case, if you have a supported version of Python installed on your
+system, you can let `PythonExternalTransform` handle the details of creating and
+starting up the expansion service. But if you want to customize the environment
+or use transforms not available in the default Beam SDK, you might need to run
+your own expansion service.
+
+For example, to start the standard expansion service for a Python transform,
+[ExpansionServiceServicer](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/expansion_service.py),
+follow these steps:
+
+1. Activate a Python virtual environment and install Apache Beam, as described
+   in the [Python quick start](/get-started/quickstart-py/).
+2. In the **beam/sdks/python** directory of the Beam source code, run the
+   following command:
+
+   ```
+   python apache_beam/runners/portability/expansion_service_main.py -p 18089 --fully_qualified_name_glob "*"
+   ```
+
+The command runs
+[expansion_service_main.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/expansion_service_main.py), which starts the standard expansion service. When you use
+Gradle to run your Java pipeline, you can specify the expansion service with the
+`expansionService` option. For example: `--expansionService=localhost:18089`.
+
+## Next steps
+
+To learn more about Beam support for cross-language pipelines, see
+[Multi-language pipelines](/documentation/programming-guide/#multi-language-pipelines).
+To learn more about the Beam DataFrame API, see
+[Beam DataFrames overview](/documentation/dsls/dataframes/overview/).