You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/11/13 03:11:59 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #13317: [BEAM-11254] New documentation on multi-language pipelines

chamikaramj commented on a change in pull request #13317:
URL: https://github.com/apache/beam/pull/13317#discussion_r522561676



##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5431,4 +5431,282 @@ use case.
 
 {{< highlight py >}}
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" BundleFinalize >}}
-{{< /highlight >}}
\ No newline at end of file
+{{< /highlight >}}
+
+## 13. Multi-language pipelines {#mulit-language-pipelines}
+
+Beam allows you to combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the Apache Kafka connector and SQL transform from the Java SDK can be used in Python streaming pipelines. 

Review comment:
       Please add following links for Kafka and SQL.
   https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py
   https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/sql.py

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5431,4 +5431,282 @@ use case.
 
 {{< highlight py >}}
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" BundleFinalize >}}
-{{< /highlight >}}
\ No newline at end of file
+{{< /highlight >}}
+
+## 13. Multi-language pipelines {#mulit-language-pipelines}
+
+Beam allows you to combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the Apache Kafka connector and SQL transform from the Java SDK can be used in Python streaming pipelines. 
+
+Pipelines that use transforms from more than one SDK-language are known as multi-language pipelines.
+
+### 13.1. Creating cross-language transforms {#create-x-lang-transforms}
+
+To make transforms written in one language available to pipelines written in another language, an expansion service for that transform is used to create and inject the appropriate language-specific pipeline fragments into your pipeline. 
+
+In the following example, a Python pipeline written the Apache Beam SDK for Python starts up a local Java expansion service on your computer to create and inject the appropriate Java pipeline fragments for executing the Java Kafka cross-language transform into your Python pipeline. The SDK then downloads and stages the necessary Java dependencies needed to execute these transforms.
+
+![Diagram of multi-language pipeline execution flow.](/images/multi-language-pipelines-diagram.svg)
+
+At runtime, the Beam runner will execute both Python and Java transforms to execute your pipeline.
+
+In this section, we will use [KafkaIO.Read](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html) to illustrate how to create a cross-language transform for Java and a test example for Python.
+
+#### 13.1.1. Creating cross-language Java transforms
+
+To make your Apache Beam Java SDK transform portable across SDK languages, you must implement two interfaces: [ExternalTransformBuilder](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ExternalTransformBuilder.java) and [ExternalTransformRegistrar](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java). The `ExternalTransformBuilder` interface constructs the cross-language transform using configuration values passed in from the pipeline and the `ExternalTransformRegistrar` interface registers the cross-language transform for use with the expansion service.
+
+**Implementing the interfaces**
+
+1. Define a Builder class for your transform that implements the `ExternalTransformBuilder` interface and overrides the `buildExternal` method that will be used to build your transform object. Initial configuration values for your transform should be defined in the `buildExternal` method. In most cases, it is convenient to make the Java transform builder class implement `ExternalTransformBuilder`.
+
+    > **Note:** `ExternalTransformBuilder` requires you to define a configuration object (a simple POJO) to capture a set of parameters sent by external SDKs to initiate the Java transform. Usually these parameters directly map to constructor parameters of the Java transform.
+
+    {{< highlight >}}
+@AutoValue.Builder
+abstract static class Builder<K, V>
+  implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {
+  abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
+
+  abstract Builder<K, V> setTopics(List<String> topics);
+
+  /** Remaining property declarations omitted for clarity. */
+
+  abstract Read<K, V> build();
+
+  @Override
+  public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
+      External.Configuration config) {
+    ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
+    for (String topic : config.topics) {
+      listBuilder.add(topic);
+    }
+    setTopics(listBuilder.build());
+
+    /** Remaining property defaults omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+2. Register the transform as an external cross-language transform by defining a class that implements `ExternalTransformRegistrar`. You must annotate your class with the `AutoService` annotation to ensure that your transform is registered and instantiated properly by the expansion service.
+3. In your registrar class, define a Uniform Resource Name (URN) for your transform. The URN must be a unique string that identifies your transform with the expansion service.
+4. From within your registrar class, define a configuration class for the parameters used during the initialization of your transform by the external SDK.
+
+    The following example from the KafkaIO transform shows how to implement steps two through four:
+
+    {{< highlight >}}
+@AutoService(ExternalTransformRegistrar.class)
+public static class External implements ExternalTransformRegistrar {
+
+  public static final String URN = "beam:external:java:kafka:read:v1";
+
+  @Override
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+    return ImmutableMap.of(
+        URN,
+        (Class<? extends ExternalTransformBuilder<?, ?, ?>>)
+            (Class<?>) AutoValue_KafkaIO_Read.Builder.class);
+  }
+
+  /** Parameters class to expose the Read transform to an external SDK. */
+  public static class Configuration {
+    private Map<String, String> consumerConfig;
+      private List<String> topics;
+      
+
+      public void setConsumerConfig(Map<String, String> consumerConfig) {
+        this.consumerConfig = consumerConfig;
+      }
+
+      public void setTopics(List<String> topics) {
+        this.topics = topics;
+      }
+
+    /** Remaining properties omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+
+After you have implemented the `ExternalTransformBuilder` and `ExternalTransformRegistrar` interfaces, your transform can be registered and created successfully by the default Java expansion service.
+
+**Using the expansion service**
+
+Java has a default expansion service included and available in the Apache Beam Java SDK. You can write your own expansion service, but that is generally not needed, so it is not covered in this section.
+
+Perform the following steps to start up a Java expansion service directly:
+
+{{< highlight >}}
+// Path to a JAR file that contains the transform to expand, cross-language specific utilities (builder, registrar, etc.), and dependencies.
+$ export EXPANSION_SERVICE_JAR=<My_expansion_service_JAR>  
+
+$ export PORT_FOR_EXPANSION_SERVICE=12345
+
+$ jar -jar $EXPANSION_SERVICE_JAR $PORT_FOR_EXPANSION_SERVICE
+{{< /highlight >}}
+
+When creating SDK-specific wrappers for your transform, SDKs may provide utilities that are readily available for easily starting up an expansion service. For example, the Python SDK provides the [BeamJarExpansionService](https://github.com/apache/beam/blob/e6db419c97532f416c8e94d91b22b9688ca837a0/sdks/python/apache_beam/transforms/external.py#L522) utility for starting up a Java expansion service using a JAR file.

Review comment:
       Is this link stable ?

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5431,4 +5431,282 @@ use case.
 
 {{< highlight py >}}
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" BundleFinalize >}}
-{{< /highlight >}}
\ No newline at end of file
+{{< /highlight >}}
+
+## 13. Multi-language pipelines {#mulit-language-pipelines}
+
+Beam allows you to combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the Apache Kafka connector and SQL transform from the Java SDK can be used in Python streaming pipelines. 
+
+Pipelines that use transforms from more than one SDK-language are known as multi-language pipelines.

Review comment:
       Emphasize the first introduction of the term "multi-language pipelines" ?

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5431,4 +5431,282 @@ use case.
 
 {{< highlight py >}}
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" BundleFinalize >}}
-{{< /highlight >}}
\ No newline at end of file
+{{< /highlight >}}
+
+## 13. Multi-language pipelines {#mulit-language-pipelines}
+
+Beam allows you to combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the Apache Kafka connector and SQL transform from the Java SDK can be used in Python streaming pipelines. 
+
+Pipelines that use transforms from more than one SDK-language are known as multi-language pipelines.
+
+### 13.1. Creating cross-language transforms {#create-x-lang-transforms}
+
+To make transforms written in one language available to pipelines written in another language, an expansion service for that transform is used to create and inject the appropriate language-specific pipeline fragments into your pipeline. 
+
+In the following example, a Python pipeline written the Apache Beam SDK for Python starts up a local Java expansion service on your computer to create and inject the appropriate Java pipeline fragments for executing the Java Kafka cross-language transform into your Python pipeline. The SDK then downloads and stages the necessary Java dependencies needed to execute these transforms.
+
+![Diagram of multi-language pipeline execution flow.](/images/multi-language-pipelines-diagram.svg)
+
+At runtime, the Beam runner will execute both Python and Java transforms to execute your pipeline.
+
+In this section, we will use [KafkaIO.Read](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html) to illustrate how to create a cross-language transform for Java and a test example for Python.
+
+#### 13.1.1. Creating cross-language Java transforms
+
+To make your Apache Beam Java SDK transform portable across SDK languages, you must implement two interfaces: [ExternalTransformBuilder](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ExternalTransformBuilder.java) and [ExternalTransformRegistrar](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java). The `ExternalTransformBuilder` interface constructs the cross-language transform using configuration values passed in from the pipeline and the `ExternalTransformRegistrar` interface registers the cross-language transform for use with the expansion service.
+
+**Implementing the interfaces**
+
+1. Define a Builder class for your transform that implements the `ExternalTransformBuilder` interface and overrides the `buildExternal` method that will be used to build your transform object. Initial configuration values for your transform should be defined in the `buildExternal` method. In most cases, it is convenient to make the Java transform builder class implement `ExternalTransformBuilder`.
+
+    > **Note:** `ExternalTransformBuilder` requires you to define a configuration object (a simple POJO) to capture a set of parameters sent by external SDKs to initiate the Java transform. Usually these parameters directly map to constructor parameters of the Java transform.
+
+    {{< highlight >}}
+@AutoValue.Builder
+abstract static class Builder<K, V>
+  implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {
+  abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
+
+  abstract Builder<K, V> setTopics(List<String> topics);

Review comment:
       Probably mention somewhere that examples were inspired by the Kafka transform.

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5431,4 +5431,282 @@ use case.
 
 {{< highlight py >}}
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" BundleFinalize >}}
-{{< /highlight >}}
\ No newline at end of file
+{{< /highlight >}}
+
+## 13. Multi-language pipelines {#mulit-language-pipelines}
+
+Beam allows you to combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the Apache Kafka connector and SQL transform from the Java SDK can be used in Python streaming pipelines. 
+
+Pipelines that use transforms from more than one SDK-language are known as multi-language pipelines.
+
+### 13.1. Creating cross-language transforms {#create-x-lang-transforms}
+
+To make transforms written in one language available to pipelines written in another language, an expansion service for that transform is used to create and inject the appropriate language-specific pipeline fragments into your pipeline. 

Review comment:
       Emphasize the first introduction of the term "expansion service" ?

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5431,4 +5431,282 @@ use case.
 
 {{< highlight py >}}
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" BundleFinalize >}}
-{{< /highlight >}}
\ No newline at end of file
+{{< /highlight >}}
+
+## 13. Multi-language pipelines {#mulit-language-pipelines}
+
+Beam allows you to combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the Apache Kafka connector and SQL transform from the Java SDK can be used in Python streaming pipelines. 
+
+Pipelines that use transforms from more than one SDK-language are known as multi-language pipelines.
+
+### 13.1. Creating cross-language transforms {#create-x-lang-transforms}
+
+To make transforms written in one language available to pipelines written in another language, an expansion service for that transform is used to create and inject the appropriate language-specific pipeline fragments into your pipeline. 
+
+In the following example, a Python pipeline written the Apache Beam SDK for Python starts up a local Java expansion service on your computer to create and inject the appropriate Java pipeline fragments for executing the Java Kafka cross-language transform into your Python pipeline. The SDK then downloads and stages the necessary Java dependencies needed to execute these transforms.
+
+![Diagram of multi-language pipeline execution flow.](/images/multi-language-pipelines-diagram.svg)
+
+At runtime, the Beam runner will execute both Python and Java transforms to execute your pipeline.
+
+In this section, we will use [KafkaIO.Read](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html) to illustrate how to create a cross-language transform for Java and a test example for Python.
+
+#### 13.1.1. Creating cross-language Java transforms
+
+To make your Apache Beam Java SDK transform portable across SDK languages, you must implement two interfaces: [ExternalTransformBuilder](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ExternalTransformBuilder.java) and [ExternalTransformRegistrar](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java). The `ExternalTransformBuilder` interface constructs the cross-language transform using configuration values passed in from the pipeline and the `ExternalTransformRegistrar` interface registers the cross-language transform for use with the expansion service.
+
+**Implementing the interfaces**
+
+1. Define a Builder class for your transform that implements the `ExternalTransformBuilder` interface and overrides the `buildExternal` method that will be used to build your transform object. Initial configuration values for your transform should be defined in the `buildExternal` method. In most cases, it is convenient to make the Java transform builder class implement `ExternalTransformBuilder`.
+
+    > **Note:** `ExternalTransformBuilder` requires you to define a configuration object (a simple POJO) to capture a set of parameters sent by external SDKs to initiate the Java transform. Usually these parameters directly map to constructor parameters of the Java transform.
+
+    {{< highlight >}}
+@AutoValue.Builder
+abstract static class Builder<K, V>
+  implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {
+  abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
+
+  abstract Builder<K, V> setTopics(List<String> topics);
+
+  /** Remaining property declarations omitted for clarity. */
+
+  abstract Read<K, V> build();
+
+  @Override
+  public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
+      External.Configuration config) {
+    ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
+    for (String topic : config.topics) {
+      listBuilder.add(topic);
+    }
+    setTopics(listBuilder.build());
+
+    /** Remaining property defaults omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+2. Register the transform as an external cross-language transform by defining a class that implements `ExternalTransformRegistrar`. You must annotate your class with the `AutoService` annotation to ensure that your transform is registered and instantiated properly by the expansion service.
+3. In your registrar class, define a Uniform Resource Name (URN) for your transform. The URN must be a unique string that identifies your transform with the expansion service.
+4. From within your registrar class, define a configuration class for the parameters used during the initialization of your transform by the external SDK.
+
+    The following example from the KafkaIO transform shows how to implement steps two through four:
+
+    {{< highlight >}}
+@AutoService(ExternalTransformRegistrar.class)
+public static class External implements ExternalTransformRegistrar {
+
+  public static final String URN = "beam:external:java:kafka:read:v1";
+
+  @Override
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+    return ImmutableMap.of(
+        URN,
+        (Class<? extends ExternalTransformBuilder<?, ?, ?>>)
+            (Class<?>) AutoValue_KafkaIO_Read.Builder.class);
+  }
+
+  /** Parameters class to expose the Read transform to an external SDK. */
+  public static class Configuration {
+    private Map<String, String> consumerConfig;
+      private List<String> topics;
+      
+
+      public void setConsumerConfig(Map<String, String> consumerConfig) {
+        this.consumerConfig = consumerConfig;
+      }
+
+      public void setTopics(List<String> topics) {
+        this.topics = topics;
+      }
+
+    /** Remaining properties omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+
+After you have implemented the `ExternalTransformBuilder` and `ExternalTransformRegistrar` interfaces, your transform can be registered and created successfully by the default Java expansion service.
+
+**Using the expansion service**
+
+Java has a default expansion service included and available in the Apache Beam Java SDK. You can write your own expansion service, but that is generally not needed, so it is not covered in this section.
+
+Perform the following steps to start up a Java expansion service directly:
+
+{{< highlight >}}
+// Path to a JAR file that contains the transform to expand, cross-language specific utilities (builder, registrar, etc.), and dependencies.
+$ export EXPANSION_SERVICE_JAR=<My_expansion_service_JAR>  
+
+$ export PORT_FOR_EXPANSION_SERVICE=12345
+
+$ jar -jar $EXPANSION_SERVICE_JAR $PORT_FOR_EXPANSION_SERVICE
+{{< /highlight >}}
+
+When creating SDK-specific wrappers for your transform, SDKs may provide utilities that are readily available for easily starting up an expansion service. For example, the Python SDK provides the [BeamJarExpansionService](https://github.com/apache/beam/blob/e6db419c97532f416c8e94d91b22b9688ca837a0/sdks/python/apache_beam/transforms/external.py#L522) utility for starting up a Java expansion service using a JAR file.
+
+**Including dependencies**
+
+If your transform requires external libraries, you can include them by adding them to the classpath of the expansion service. After they are included in the classpath, they will be staged when your transform is expanded by the expansion service.
+
+**Writing SDK-specific wrappers**
+
+Your cross-language Java transform can be called through the lower-level `ExternalTransform` class in a multi-language pipeline (as described in the next section); however, if possible, you should create a SDK-specific wrapper written in the programming language of the pipeline (such as Python) to access the transform instead. This higher-level abstraction will make it easier for pipeline authors to use your transform. 
+
+To create an SDK wrapper for use in a Python pipeline, do the following:
+
+1. Create a Python module for your cross-language transform.
+2. In the module, build the payload that should be used to initiate the cross-language transform expansion request using one of the available [PayloadBuilder](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py) classes.
+
+    The parameter names and types of the payload should map to parameter names and types of the configuration POJO provided to the Java `ExternalTransformBuilder`. Parameter types are mapped across SDKs using a [Beam schema](https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto). Parameter names are mapped by simply converting Python underscore-separated variable names to camel-case (Java standard).
+
+    In the following example, kafka.py uses `NamedTupleBasedPayloadBuilder` to build the payload. The parameters map to the Java [KafkaIO.External.Configuration](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java) config object.
+
+    {{< highlight >}}
+ReadFromKafkaSchema = typing.NamedTuple(
+  'ReadFromKafkaSchema',
+  [
+      ('consumer_config', typing.Mapping[unicode, unicode]),

Review comment:
       Mention that the parameters mentioned here map to instance variables of the 'Configuration' Java class defined above.

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5431,4 +5431,282 @@ use case.
 
 {{< highlight py >}}
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" BundleFinalize >}}
-{{< /highlight >}}
\ No newline at end of file
+{{< /highlight >}}
+
+## 13. Multi-language pipelines {#mulit-language-pipelines}
+
+Beam allows you to combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the Apache Kafka connector and SQL transform from the Java SDK can be used in Python streaming pipelines. 
+
+Pipelines that use transforms from more than one SDK-language are known as multi-language pipelines.
+
+### 13.1. Creating cross-language transforms {#create-x-lang-transforms}
+
+To make transforms written in one language available to pipelines written in another language, an expansion service for that transform is used to create and inject the appropriate language-specific pipeline fragments into your pipeline. 
+
+In the following example, a Python pipeline written the Apache Beam SDK for Python starts up a local Java expansion service on your computer to create and inject the appropriate Java pipeline fragments for executing the Java Kafka cross-language transform into your Python pipeline. The SDK then downloads and stages the necessary Java dependencies needed to execute these transforms.
+
+![Diagram of multi-language pipeline execution flow.](/images/multi-language-pipelines-diagram.svg)
+
+At runtime, the Beam runner will execute both Python and Java transforms to execute your pipeline.
+
+In this section, we will use [KafkaIO.Read](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html) to illustrate how to create a cross-language transform for Java and a test example for Python.
+
+#### 13.1.1. Creating cross-language Java transforms
+
+To make your Apache Beam Java SDK transform portable across SDK languages, you must implement two interfaces: [ExternalTransformBuilder](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ExternalTransformBuilder.java) and [ExternalTransformRegistrar](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java). The `ExternalTransformBuilder` interface constructs the cross-language transform using configuration values passed in from the pipeline and the `ExternalTransformRegistrar` interface registers the cross-language transform for use with the expansion service.
+
+**Implementing the interfaces**
+
+1. Define a Builder class for your transform that implements the `ExternalTransformBuilder` interface and overrides the `buildExternal` method that will be used to build your transform object. Initial configuration values for your transform should be defined in the `buildExternal` method. In most cases, it is convenient to make the Java transform builder class implement `ExternalTransformBuilder`.
+
+    > **Note:** `ExternalTransformBuilder` requires you to define a configuration object (a simple POJO) to capture a set of parameters sent by external SDKs to initiate the Java transform. Usually these parameters directly map to constructor parameters of the Java transform.
+
+    {{< highlight >}}
+@AutoValue.Builder
+abstract static class Builder<K, V>
+  implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {
+  abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
+
+  abstract Builder<K, V> setTopics(List<String> topics);
+
+  /** Remaining property declarations omitted for clarity. */
+
+  abstract Read<K, V> build();
+
+  @Override
+  public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
+      External.Configuration config) {
+    ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
+    for (String topic : config.topics) {
+      listBuilder.add(topic);
+    }
+    setTopics(listBuilder.build());
+
+    /** Remaining property defaults omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+2. Register the transform as an external cross-language transform by defining a class that implements `ExternalTransformRegistrar`. You must annotate your class with the `AutoService` annotation to ensure that your transform is registered and instantiated properly by the expansion service.
+3. In your registrar class, define a Uniform Resource Name (URN) for your transform. The URN must be a unique string that identifies your transform with the expansion service.
+4. From within your registrar class, define a configuration class for the parameters used during the initialization of your transform by the external SDK.
+
+    The following example from the KafkaIO transform shows how to implement steps two through four:
+
+    {{< highlight >}}
+@AutoService(ExternalTransformRegistrar.class)
+public static class External implements ExternalTransformRegistrar {
+
+  public static final String URN = "beam:external:java:kafka:read:v1";
+
+  @Override
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+    return ImmutableMap.of(
+        URN,
+        (Class<? extends ExternalTransformBuilder<?, ?, ?>>)
+            (Class<?>) AutoValue_KafkaIO_Read.Builder.class);
+  }
+
+  /** Parameters class to expose the Read transform to an external SDK. */
+  public static class Configuration {
+    private Map<String, String> consumerConfig;
+      private List<String> topics;
+      
+
+      public void setConsumerConfig(Map<String, String> consumerConfig) {
+        this.consumerConfig = consumerConfig;
+      }
+
+      public void setTopics(List<String> topics) {
+        this.topics = topics;
+      }
+
+    /** Remaining properties omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+
+After you have implemented the `ExternalTransformBuilder` and `ExternalTransformRegistrar` interfaces, your transform can be registered and created successfully by the default Java expansion service.
+
+**Using the expansion service**
+
+Java has a default expansion service included and available in the Apache Beam Java SDK. You can write your own expansion service, but that is generally not needed, so it is not covered in this section.
+
+Perform the following steps to start up a Java expansion service directly:
+
+{{< highlight >}}
+// Path to a JAR file that contains the transform to expand, cross-language specific utilities (builder, registrar, etc.), and dependencies.
+$ export EXPANSION_SERVICE_JAR=<My_expansion_service_JAR>  
+
+$ export PORT_FOR_EXPANSION_SERVICE=12345
+
+$ jar -jar $EXPANSION_SERVICE_JAR $PORT_FOR_EXPANSION_SERVICE
+{{< /highlight >}}
+
+When creating SDK-specific wrappers for your transform, SDKs may provide utilities that are readily available for easily starting up an expansion service. For example, the Python SDK provides the [BeamJarExpansionService](https://github.com/apache/beam/blob/e6db419c97532f416c8e94d91b22b9688ca837a0/sdks/python/apache_beam/transforms/external.py#L522) utility for starting up a Java expansion service using a JAR file.
+
+**Including dependencies**
+
+If your transform requires external libraries, you can include them by adding them to the classpath of the expansion service. After they are included in the classpath, they will be staged when your transform is expanded by the expansion service.
+
+**Writing SDK-specific wrappers**
+
+Your cross-language Java transform can be called through the lower-level `ExternalTransform` class in a multi-language pipeline (as described in the next section); however, if possible, you should create a SDK-specific wrapper written in the programming language of the pipeline (such as Python) to access the transform instead. This higher-level abstraction will make it easier for pipeline authors to use your transform. 

Review comment:
       Link to https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py
   (or to actual class if we can find a stable link).

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5431,4 +5431,282 @@ use case.
 
 {{< highlight py >}}
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" BundleFinalize >}}
-{{< /highlight >}}
\ No newline at end of file
+{{< /highlight >}}
+
+## 13. Multi-language pipelines {#mulit-language-pipelines}
+
+Beam allows you to combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the Apache Kafka connector and SQL transform from the Java SDK can be used in Python streaming pipelines. 
+
+Pipelines that use transforms from more than one SDK-language are known as multi-language pipelines.
+
+### 13.1. Creating cross-language transforms {#create-x-lang-transforms}
+
+To make transforms written in one language available to pipelines written in another language, an expansion service for that transform is used to create and inject the appropriate language-specific pipeline fragments into your pipeline. 
+
+In the following example, a Python pipeline written the Apache Beam SDK for Python starts up a local Java expansion service on your computer to create and inject the appropriate Java pipeline fragments for executing the Java Kafka cross-language transform into your Python pipeline. The SDK then downloads and stages the necessary Java dependencies needed to execute these transforms.
+
+![Diagram of multi-language pipeline execution flow.](/images/multi-language-pipelines-diagram.svg)
+
+At runtime, the Beam runner will execute both Python and Java transforms to execute your pipeline.
+
+In this section, we will use [KafkaIO.Read](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html) to illustrate how to create a cross-language transform for Java and a test example for Python.
+
+#### 13.1.1. Creating cross-language Java transforms
+
+To make your Apache Beam Java SDK transform portable across SDK languages, you must implement two interfaces: [ExternalTransformBuilder](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ExternalTransformBuilder.java) and [ExternalTransformRegistrar](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java). The `ExternalTransformBuilder` interface constructs the cross-language transform using configuration values passed in from the pipeline and the `ExternalTransformRegistrar` interface registers the cross-language transform for use with the expansion service.
+
+**Implementing the interfaces**
+
+1. Define a Builder class for your transform that implements the `ExternalTransformBuilder` interface and overrides the `buildExternal` method that will be used to build your transform object. Initial configuration values for your transform should be defined in the `buildExternal` method. In most cases, it is convenient to make the Java transform builder class implement `ExternalTransformBuilder`.
+
+    > **Note:** `ExternalTransformBuilder` requires you to define a configuration object (a simple POJO) to capture a set of parameters sent by external SDKs to initiate the Java transform. Usually these parameters directly map to constructor parameters of the Java transform.
+
+    {{< highlight >}}
+@AutoValue.Builder
+abstract static class Builder<K, V>
+  implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {
+  abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
+
+  abstract Builder<K, V> setTopics(List<String> topics);
+
+  /** Remaining property declarations omitted for clarity. */
+
+  abstract Read<K, V> build();
+
+  @Override
+  public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
+      External.Configuration config) {
+    ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
+    for (String topic : config.topics) {
+      listBuilder.add(topic);
+    }
+    setTopics(listBuilder.build());
+
+    /** Remaining property defaults omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+2. Register the transform as an external cross-language transform by defining a class that implements `ExternalTransformRegistrar`. You must annotate your class with the `AutoService` annotation to ensure that your transform is registered and instantiated properly by the expansion service.
+3. In your registrar class, define a Uniform Resource Name (URN) for your transform. The URN must be a unique string that identifies your transform with the expansion service.
+4. From within your registrar class, define a configuration class for the parameters used during the initialization of your transform by the external SDK.
+
+    The following example from the KafkaIO transform shows how to implement steps two through four:
+
+    {{< highlight >}}
+@AutoService(ExternalTransformRegistrar.class)
+public static class External implements ExternalTransformRegistrar {
+
+  public static final String URN = "beam:external:java:kafka:read:v1";
+
+  @Override
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+    return ImmutableMap.of(
+        URN,
+        (Class<? extends ExternalTransformBuilder<?, ?, ?>>)
+            (Class<?>) AutoValue_KafkaIO_Read.Builder.class);
+  }
+
+  /** Parameters class to expose the Read transform to an external SDK. */
+  public static class Configuration {
+    private Map<String, String> consumerConfig;
+      private List<String> topics;
+      
+
+      public void setConsumerConfig(Map<String, String> consumerConfig) {
+        this.consumerConfig = consumerConfig;
+      }
+
+      public void setTopics(List<String> topics) {
+        this.topics = topics;
+      }
+
+    /** Remaining properties omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+
+After you have implemented the `ExternalTransformBuilder` and `ExternalTransformRegistrar` interfaces, your transform can be registered and created successfully by the default Java expansion service.
+
+**Using the expansion service**
+
+Java has a default expansion service included and available in the Apache Beam Java SDK. You can write your own expansion service, but that is generally not needed, so it is not covered in this section.
+
+Perform the following steps to start up a Java expansion service directly:
+
+{{< highlight >}}
+// Path to a JAR file that contains the transform to expand, cross-language specific utilities (builder, registrar, etc.), and dependencies.
+$ export EXPANSION_SERVICE_JAR=<My_expansion_service_JAR>  
+
+$ export PORT_FOR_EXPANSION_SERVICE=12345
+
+$ jar -jar $EXPANSION_SERVICE_JAR $PORT_FOR_EXPANSION_SERVICE
+{{< /highlight >}}
+
+When creating SDK-specific wrappers for your transform, SDKs may provide utilities that are readily available for easily starting up an expansion service. For example, the Python SDK provides the [BeamJarExpansionService](https://github.com/apache/beam/blob/e6db419c97532f416c8e94d91b22b9688ca837a0/sdks/python/apache_beam/transforms/external.py#L522) utility for starting up a Java expansion service using a JAR file.
+
+**Including dependencies**
+
+If your transform requires external libraries, you can include them by adding them to the classpath of the expansion service. After they are included in the classpath, they will be staged when your transform is expanded by the expansion service.
+
+**Writing SDK-specific wrappers**
+
+Your cross-language Java transform can be called through the lower-level `ExternalTransform` class in a multi-language pipeline (as described in the next section); however, if possible, you should create a SDK-specific wrapper written in the programming language of the pipeline (such as Python) to access the transform instead. This higher-level abstraction will make it easier for pipeline authors to use your transform. 
+
+To create an SDK wrapper for use in a Python pipeline, do the following:
+
+1. Create a Python module for your cross-language transform.
+2. In the module, build the payload that should be used to initiate the cross-language transform expansion request using one of the available [PayloadBuilder](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py) classes.
+
+    The parameter names and types of the payload should map to parameter names and types of the configuration POJO provided to the Java `ExternalTransformBuilder`. Parameter types are mapped across SDKs using a [Beam schema](https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto). Parameter names are mapped by simply converting Python underscore-separated variable names to camel-case (Java standard).
+
+    In the following example, kafka.py uses `NamedTupleBasedPayloadBuilder` to build the payload. The parameters map to the Java [KafkaIO.External.Configuration](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java) config object.
+
+    {{< highlight >}}
+ReadFromKafkaSchema = typing.NamedTuple(
+  'ReadFromKafkaSchema',
+  [
+      ('consumer_config', typing.Mapping[unicode, unicode]),
+      ('topics', typing.List[unicode])
+      # Other properties omitted for clarity.
+  ])
+payload = NamedTupleBasedPayloadBuilder(ReadFromKafkaSchema(...))
+    {{< /highlight >}}
+3. Start an expansion service unless one is specified by the pipeline creator. To use the `BeamJarExpansionService` utility from the Beam SDK, do the following:
+
+    1. Add a Gradle target to Beam that can be used to build a shaded expansion service JAR for the target Java transform. This target should produce a Beam JAR that contains all dependencies needed for expanding the Java transform and the JAR should be released with Beam.
+    2. In your Python module, instantiate `BeamJarExpansionService` with the Gradle target.
+
+        {{< highlight >}}
+    expansion_service = BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
+        {{< /highlight >}}
+4. Add a Python wrapper transform class that extends [ExternalTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py). Pass the payload and expansion service defined above as parameters to the constructor of the `ExternalTransform` parent class.
+
+#### 13.1.2. Creating cross-language Python transforms
+
+To make your Python transform usable with different SDK languages, you must create a wrapper class that registers an existing Python transform as a cross-language transform for use with the Python expansion service and calls into that existing transform to perform its intended operation.
+
+**Defining the wrapper class**

Review comment:
       I think we should call this something other than the "wrapper class" since we use that term for language wrappers defined to use cross-language transforms in other SDKs. How about "Python module" ?

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5431,4 +5431,282 @@ use case.
 
 {{< highlight py >}}
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" BundleFinalize >}}
-{{< /highlight >}}
\ No newline at end of file
+{{< /highlight >}}
+
+## 13. Multi-language pipelines {#mulit-language-pipelines}
+
+Beam allows you to combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the Apache Kafka connector and SQL transform from the Java SDK can be used in Python streaming pipelines. 
+
+Pipelines that use transforms from more than one SDK-language are known as multi-language pipelines.
+
+### 13.1. Creating cross-language transforms {#create-x-lang-transforms}
+
+To make transforms written in one language available to pipelines written in another language, an expansion service for that transform is used to create and inject the appropriate language-specific pipeline fragments into your pipeline. 
+
+In the following example, a Python pipeline written the Apache Beam SDK for Python starts up a local Java expansion service on your computer to create and inject the appropriate Java pipeline fragments for executing the Java Kafka cross-language transform into your Python pipeline. The SDK then downloads and stages the necessary Java dependencies needed to execute these transforms.
+
+![Diagram of multi-language pipeline execution flow.](/images/multi-language-pipelines-diagram.svg)
+
+At runtime, the Beam runner will execute both Python and Java transforms to execute your pipeline.
+
+In this section, we will use [KafkaIO.Read](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html) to illustrate how to create a cross-language transform for Java and a test example for Python.
+
+#### 13.1.1. Creating cross-language Java transforms
+
+To make your Apache Beam Java SDK transform portable across SDK languages, you must implement two interfaces: [ExternalTransformBuilder](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ExternalTransformBuilder.java) and [ExternalTransformRegistrar](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java). The `ExternalTransformBuilder` interface constructs the cross-language transform using configuration values passed in from the pipeline and the `ExternalTransformRegistrar` interface registers the cross-language transform for use with the expansion service.
+
+**Implementing the interfaces**
+
+1. Define a Builder class for your transform that implements the `ExternalTransformBuilder` interface and overrides the `buildExternal` method that will be used to build your transform object. Initial configuration values for your transform should be defined in the `buildExternal` method. In most cases, it is convenient to make the Java transform builder class implement `ExternalTransformBuilder`.
+
+    > **Note:** `ExternalTransformBuilder` requires you to define a configuration object (a simple POJO) to capture a set of parameters sent by external SDKs to initiate the Java transform. Usually these parameters directly map to constructor parameters of the Java transform.
+
+    {{< highlight >}}
+@AutoValue.Builder
+abstract static class Builder<K, V>
+  implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {
+  abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
+
+  abstract Builder<K, V> setTopics(List<String> topics);
+
+  /** Remaining property declarations omitted for clarity. */
+
+  abstract Read<K, V> build();
+
+  @Override
+  public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
+      External.Configuration config) {
+    ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
+    for (String topic : config.topics) {
+      listBuilder.add(topic);
+    }
+    setTopics(listBuilder.build());
+
+    /** Remaining property defaults omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+2. Register the transform as an external cross-language transform by defining a class that implements `ExternalTransformRegistrar`. You must annotate your class with the `AutoService` annotation to ensure that your transform is registered and instantiated properly by the expansion service.
+3. In your registrar class, define a Uniform Resource Name (URN) for your transform. The URN must be a unique string that identifies your transform with the expansion service.
+4. From within your registrar class, define a configuration class for the parameters used during the initialization of your transform by the external SDK.
+
+    The following example from the KafkaIO transform shows how to implement steps two through four:
+
+    {{< highlight >}}
+@AutoService(ExternalTransformRegistrar.class)
+public static class External implements ExternalTransformRegistrar {
+
+  public static final String URN = "beam:external:java:kafka:read:v1";
+
+  @Override
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+    return ImmutableMap.of(
+        URN,
+        (Class<? extends ExternalTransformBuilder<?, ?, ?>>)
+            (Class<?>) AutoValue_KafkaIO_Read.Builder.class);
+  }
+
+  /** Parameters class to expose the Read transform to an external SDK. */
+  public static class Configuration {
+    private Map<String, String> consumerConfig;
+      private List<String> topics;
+      
+
+      public void setConsumerConfig(Map<String, String> consumerConfig) {
+        this.consumerConfig = consumerConfig;
+      }
+
+      public void setTopics(List<String> topics) {
+        this.topics = topics;
+      }
+
+    /** Remaining properties omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+
+After you have implemented the `ExternalTransformBuilder` and `ExternalTransformRegistrar` interfaces, your transform can be registered and created successfully by the default Java expansion service.
+
+**Using the expansion service**
+
+Java has a default expansion service included and available in the Apache Beam Java SDK. You can write your own expansion service, but that is generally not needed, so it is not covered in this section.
+
+Perform the following steps to start up a Java expansion service directly:
+
+{{< highlight >}}
+// Path to a JAR file that contains the transform to expand, cross-language specific utilities (builder, registrar, etc.), and dependencies.
+$ export EXPANSION_SERVICE_JAR=<My_expansion_service_JAR>  
+
+$ export PORT_FOR_EXPANSION_SERVICE=12345
+
+$ jar -jar $EXPANSION_SERVICE_JAR $PORT_FOR_EXPANSION_SERVICE
+{{< /highlight >}}
+
+When creating SDK-specific wrappers for your transform, SDKs may provide utilities that are readily available for easily starting up an expansion service. For example, the Python SDK provides the [BeamJarExpansionService](https://github.com/apache/beam/blob/e6db419c97532f416c8e94d91b22b9688ca837a0/sdks/python/apache_beam/transforms/external.py#L522) utility for starting up a Java expansion service using a JAR file.
+
+**Including dependencies**
+
+If your transform requires external libraries, you can include them by adding them to the classpath of the expansion service. After they are included in the classpath, they will be staged when your transform is expanded by the expansion service.
+
+**Writing SDK-specific wrappers**
+
+Your cross-language Java transform can be called through the lower-level `ExternalTransform` class in a multi-language pipeline (as described in the next section); however, if possible, you should create a SDK-specific wrapper written in the programming language of the pipeline (such as Python) to access the transform instead. This higher-level abstraction will make it easier for pipeline authors to use your transform. 
+
+To create an SDK wrapper for use in a Python pipeline, do the following:
+
+1. Create a Python module for your cross-language transform.
+2. In the module, build the payload that should be used to initiate the cross-language transform expansion request using one of the available [PayloadBuilder](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py) classes.
+
+    The parameter names and types of the payload should map to parameter names and types of the configuration POJO provided to the Java `ExternalTransformBuilder`. Parameter types are mapped across SDKs using a [Beam schema](https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto). Parameter names are mapped by simply converting Python underscore-separated variable names to camel-case (Java standard).
+
+    In the following example, kafka.py uses `NamedTupleBasedPayloadBuilder` to build the payload. The parameters map to the Java [KafkaIO.External.Configuration](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java) config object.
+
+    {{< highlight >}}
+ReadFromKafkaSchema = typing.NamedTuple(
+  'ReadFromKafkaSchema',
+  [
+      ('consumer_config', typing.Mapping[unicode, unicode]),
+      ('topics', typing.List[unicode])
+      # Other properties omitted for clarity.
+  ])
+payload = NamedTupleBasedPayloadBuilder(ReadFromKafkaSchema(...))
+    {{< /highlight >}}
+3. Start an expansion service unless one is specified by the pipeline creator. To use the `BeamJarExpansionService` utility from the Beam SDK, do the following:

Review comment:
       BeamJarExpansionService can only be used for wrappers defined in Beam. So may be expand to following to clarify.
   
   "Start an expansion service unless one is specified by the pipeline creator.
   
   Beam Python SDK provides a utility, `BeamJarExpansionService`, for easily starting up an expansion service based on a jars released with Beam. To use this, do the following:"

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5431,4 +5431,282 @@ use case.
 
 {{< highlight py >}}
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" BundleFinalize >}}
-{{< /highlight >}}
\ No newline at end of file
+{{< /highlight >}}
+
+## 13. Multi-language pipelines {#mulit-language-pipelines}
+
+Beam allows you to combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the Apache Kafka connector and SQL transform from the Java SDK can be used in Python streaming pipelines. 
+
+Pipelines that use transforms from more than one SDK-language are known as multi-language pipelines.
+
+### 13.1. Creating cross-language transforms {#create-x-lang-transforms}
+
+To make transforms written in one language available to pipelines written in another language, an expansion service for that transform is used to create and inject the appropriate language-specific pipeline fragments into your pipeline. 
+
+In the following example, a Python pipeline written the Apache Beam SDK for Python starts up a local Java expansion service on your computer to create and inject the appropriate Java pipeline fragments for executing the Java Kafka cross-language transform into your Python pipeline. The SDK then downloads and stages the necessary Java dependencies needed to execute these transforms.
+
+![Diagram of multi-language pipeline execution flow.](/images/multi-language-pipelines-diagram.svg)
+
+At runtime, the Beam runner will execute both Python and Java transforms to execute your pipeline.
+
+In this section, we will use [KafkaIO.Read](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html) to illustrate how to create a cross-language transform for Java and a test example for Python.
+
+#### 13.1.1. Creating cross-language Java transforms
+
+To make your Apache Beam Java SDK transform portable across SDK languages, you must implement two interfaces: [ExternalTransformBuilder](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ExternalTransformBuilder.java) and [ExternalTransformRegistrar](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java). The `ExternalTransformBuilder` interface constructs the cross-language transform using configuration values passed in from the pipeline and the `ExternalTransformRegistrar` interface registers the cross-language transform for use with the expansion service.
+
+**Implementing the interfaces**
+
+1. Define a Builder class for your transform that implements the `ExternalTransformBuilder` interface and overrides the `buildExternal` method that will be used to build your transform object. Initial configuration values for your transform should be defined in the `buildExternal` method. In most cases, it is convenient to make the Java transform builder class implement `ExternalTransformBuilder`.
+
+    > **Note:** `ExternalTransformBuilder` requires you to define a configuration object (a simple POJO) to capture a set of parameters sent by external SDKs to initiate the Java transform. Usually these parameters directly map to constructor parameters of the Java transform.
+
+    {{< highlight >}}
+@AutoValue.Builder
+abstract static class Builder<K, V>
+  implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {
+  abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
+
+  abstract Builder<K, V> setTopics(List<String> topics);
+
+  /** Remaining property declarations omitted for clarity. */
+
+  abstract Read<K, V> build();
+
+  @Override
+  public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
+      External.Configuration config) {
+    ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
+    for (String topic : config.topics) {
+      listBuilder.add(topic);
+    }
+    setTopics(listBuilder.build());
+
+    /** Remaining property defaults omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+2. Register the transform as an external cross-language transform by defining a class that implements `ExternalTransformRegistrar`. You must annotate your class with the `AutoService` annotation to ensure that your transform is registered and instantiated properly by the expansion service.
+3. In your registrar class, define a Uniform Resource Name (URN) for your transform. The URN must be a unique string that identifies your transform with the expansion service.
+4. From within your registrar class, define a configuration class for the parameters used during the initialization of your transform by the external SDK.
+
+    The following example from the KafkaIO transform shows how to implement steps two through four:
+
+    {{< highlight >}}
+@AutoService(ExternalTransformRegistrar.class)
+public static class External implements ExternalTransformRegistrar {
+
+  public static final String URN = "beam:external:java:kafka:read:v1";
+
+  @Override
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+    return ImmutableMap.of(
+        URN,
+        (Class<? extends ExternalTransformBuilder<?, ?, ?>>)
+            (Class<?>) AutoValue_KafkaIO_Read.Builder.class);
+  }
+
+  /** Parameters class to expose the Read transform to an external SDK. */
+  public static class Configuration {
+    private Map<String, String> consumerConfig;
+      private List<String> topics;
+      
+
+      public void setConsumerConfig(Map<String, String> consumerConfig) {
+        this.consumerConfig = consumerConfig;
+      }
+
+      public void setTopics(List<String> topics) {
+        this.topics = topics;
+      }
+
+    /** Remaining properties omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+
+After you have implemented the `ExternalTransformBuilder` and `ExternalTransformRegistrar` interfaces, your transform can be registered and created successfully by the default Java expansion service.
+
+**Using the expansion service**
+
+Java has a default expansion service included and available in the Apache Beam Java SDK. You can write your own expansion service, but that is generally not needed, so it is not covered in this section.
+
+Perform the following steps to start up a Java expansion service directly:
+
+{{< highlight >}}
+// Path to a JAR file that contains the transform to expand, cross-language specific utilities (builder, registrar, etc.), and dependencies.
+$ export EXPANSION_SERVICE_JAR=<My_expansion_service_JAR>  
+
+$ export PORT_FOR_EXPANSION_SERVICE=12345
+
+$ jar -jar $EXPANSION_SERVICE_JAR $PORT_FOR_EXPANSION_SERVICE
+{{< /highlight >}}
+
+When creating SDK-specific wrappers for your transform, SDKs may provide utilities that are readily available for easily starting up an expansion service. For example, the Python SDK provides the [BeamJarExpansionService](https://github.com/apache/beam/blob/e6db419c97532f416c8e94d91b22b9688ca837a0/sdks/python/apache_beam/transforms/external.py#L522) utility for starting up a Java expansion service using a JAR file.
+
+**Including dependencies**
+
+If your transform requires external libraries, you can include them by adding them to the classpath of the expansion service. After they are included in the classpath, they will be staged when your transform is expanded by the expansion service.
+
+**Writing SDK-specific wrappers**
+
+Your cross-language Java transform can be called through the lower-level `ExternalTransform` class in a multi-language pipeline (as described in the next section); however, if possible, you should create a SDK-specific wrapper written in the programming language of the pipeline (such as Python) to access the transform instead. This higher-level abstraction will make it easier for pipeline authors to use your transform. 
+
+To create an SDK wrapper for use in a Python pipeline, do the following:
+
+1. Create a Python module for your cross-language transform.
+2. In the module, build the payload that should be used to initiate the cross-language transform expansion request using one of the available [PayloadBuilder](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py) classes.
+
+    The parameter names and types of the payload should map to parameter names and types of the configuration POJO provided to the Java `ExternalTransformBuilder`. Parameter types are mapped across SDKs using a [Beam schema](https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto). Parameter names are mapped by simply converting Python underscore-separated variable names to camel-case (Java standard).
+
+    In the following example, kafka.py uses `NamedTupleBasedPayloadBuilder` to build the payload. The parameters map to the Java [KafkaIO.External.Configuration](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java) config object.
+
+    {{< highlight >}}
+ReadFromKafkaSchema = typing.NamedTuple(
+  'ReadFromKafkaSchema',
+  [
+      ('consumer_config', typing.Mapping[unicode, unicode]),
+      ('topics', typing.List[unicode])
+      # Other properties omitted for clarity.
+  ])

Review comment:
       Makes sense.

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5431,4 +5431,282 @@ use case.
 
 {{< highlight py >}}
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" BundleFinalize >}}
-{{< /highlight >}}
\ No newline at end of file
+{{< /highlight >}}
+
+## 13. Multi-language pipelines {#mulit-language-pipelines}
+
+Beam allows you to combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the Apache Kafka connector and SQL transform from the Java SDK can be used in Python streaming pipelines. 
+
+Pipelines that use transforms from more than one SDK-language are known as multi-language pipelines.
+
+### 13.1. Creating cross-language transforms {#create-x-lang-transforms}
+
+To make transforms written in one language available to pipelines written in another language, an expansion service for that transform is used to create and inject the appropriate language-specific pipeline fragments into your pipeline. 
+
+In the following example, a Python pipeline written the Apache Beam SDK for Python starts up a local Java expansion service on your computer to create and inject the appropriate Java pipeline fragments for executing the Java Kafka cross-language transform into your Python pipeline. The SDK then downloads and stages the necessary Java dependencies needed to execute these transforms.
+
+![Diagram of multi-language pipeline execution flow.](/images/multi-language-pipelines-diagram.svg)
+
+At runtime, the Beam runner will execute both Python and Java transforms to execute your pipeline.
+
+In this section, we will use [KafkaIO.Read](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html) to illustrate how to create a cross-language transform for Java and a test example for Python.
+
+#### 13.1.1. Creating cross-language Java transforms
+
+To make your Apache Beam Java SDK transform portable across SDK languages, you must implement two interfaces: [ExternalTransformBuilder](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ExternalTransformBuilder.java) and [ExternalTransformRegistrar](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java). The `ExternalTransformBuilder` interface constructs the cross-language transform using configuration values passed in from the pipeline and the `ExternalTransformRegistrar` interface registers the cross-language transform for use with the expansion service.
+
+**Implementing the interfaces**
+
+1. Define a Builder class for your transform that implements the `ExternalTransformBuilder` interface and overrides the `buildExternal` method that will be used to build your transform object. Initial configuration values for your transform should be defined in the `buildExternal` method. In most cases, it is convenient to make the Java transform builder class implement `ExternalTransformBuilder`.
+
+    > **Note:** `ExternalTransformBuilder` requires you to define a configuration object (a simple POJO) to capture a set of parameters sent by external SDKs to initiate the Java transform. Usually these parameters directly map to constructor parameters of the Java transform.
+
+    {{< highlight >}}
+@AutoValue.Builder
+abstract static class Builder<K, V>
+  implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {
+  abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
+
+  abstract Builder<K, V> setTopics(List<String> topics);
+
+  /** Remaining property declarations omitted for clarity. */
+
+  abstract Read<K, V> build();
+
+  @Override
+  public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
+      External.Configuration config) {
+    ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
+    for (String topic : config.topics) {
+      listBuilder.add(topic);
+    }
+    setTopics(listBuilder.build());
+
+    /** Remaining property defaults omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+2. Register the transform as an external cross-language transform by defining a class that implements `ExternalTransformRegistrar`. You must annotate your class with the `AutoService` annotation to ensure that your transform is registered and instantiated properly by the expansion service.
+3. In your registrar class, define a Uniform Resource Name (URN) for your transform. The URN must be a unique string that identifies your transform with the expansion service.
+4. From within your registrar class, define a configuration class for the parameters used during the initialization of your transform by the external SDK.
+
+    The following example from the KafkaIO transform shows how to implement steps two through four:
+
+    {{< highlight >}}
+@AutoService(ExternalTransformRegistrar.class)
+public static class External implements ExternalTransformRegistrar {
+
+  public static final String URN = "beam:external:java:kafka:read:v1";
+
+  @Override
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+    return ImmutableMap.of(
+        URN,
+        (Class<? extends ExternalTransformBuilder<?, ?, ?>>)
+            (Class<?>) AutoValue_KafkaIO_Read.Builder.class);
+  }
+
+  /** Parameters class to expose the Read transform to an external SDK. */
+  public static class Configuration {
+    private Map<String, String> consumerConfig;
+      private List<String> topics;
+      
+
+      public void setConsumerConfig(Map<String, String> consumerConfig) {
+        this.consumerConfig = consumerConfig;
+      }
+
+      public void setTopics(List<String> topics) {
+        this.topics = topics;
+      }
+
+    /** Remaining properties omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+
+After you have implemented the `ExternalTransformBuilder` and `ExternalTransformRegistrar` interfaces, your transform can be registered and created successfully by the default Java expansion service.
+
+**Using the expansion service**
+
+Java has a default expansion service included and available in the Apache Beam Java SDK. You can write your own expansion service, but that is generally not needed, so it is not covered in this section.
+
+Perform the following steps to start up a Java expansion service directly:
+
+{{< highlight >}}
+// Path to a JAR file that contains the transform to expand, cross-language specific utilities (builder, registrar, etc.), and dependencies.
+$ export EXPANSION_SERVICE_JAR=<My_expansion_service_JAR>  
+
+$ export PORT_FOR_EXPANSION_SERVICE=12345
+
+$ jar -jar $EXPANSION_SERVICE_JAR $PORT_FOR_EXPANSION_SERVICE
+{{< /highlight >}}
+
+When creating SDK-specific wrappers for your transform, SDKs may provide utilities that are readily available for easily starting up an expansion service. For example, the Python SDK provides the [BeamJarExpansionService](https://github.com/apache/beam/blob/e6db419c97532f416c8e94d91b22b9688ca837a0/sdks/python/apache_beam/transforms/external.py#L522) utility for starting up a Java expansion service using a JAR file.
+
+**Including dependencies**
+
+If your transform requires external libraries, you can include them by adding them to the classpath of the expansion service. After they are included in the classpath, they will be staged when your transform is expanded by the expansion service.
+
+**Writing SDK-specific wrappers**
+
+Your cross-language Java transform can be called through the lower-level `ExternalTransform` class in a multi-language pipeline (as described in the next section); however, if possible, you should create a SDK-specific wrapper written in the programming language of the pipeline (such as Python) to access the transform instead. This higher-level abstraction will make it easier for pipeline authors to use your transform. 
+
+To create an SDK wrapper for use in a Python pipeline, do the following:
+
+1. Create a Python module for your cross-language transform.
+2. In the module, build the payload that should be used to initiate the cross-language transform expansion request using one of the available [PayloadBuilder](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py) classes.
+
+    The parameter names and types of the payload should map to parameter names and types of the configuration POJO provided to the Java `ExternalTransformBuilder`. Parameter types are mapped across SDKs using a [Beam schema](https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto). Parameter names are mapped by simply converting Python underscore-separated variable names to camel-case (Java standard).
+
+    In the following example, kafka.py uses `NamedTupleBasedPayloadBuilder` to build the payload. The parameters map to the Java [KafkaIO.External.Configuration](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java) config object.
+
+    {{< highlight >}}
+ReadFromKafkaSchema = typing.NamedTuple(
+  'ReadFromKafkaSchema',
+  [
+      ('consumer_config', typing.Mapping[unicode, unicode]),
+      ('topics', typing.List[unicode])
+      # Other properties omitted for clarity.
+  ])
+payload = NamedTupleBasedPayloadBuilder(ReadFromKafkaSchema(...))
+    {{< /highlight >}}
+3. Start an expansion service unless one is specified by the pipeline creator. To use the `BeamJarExpansionService` utility from the Beam SDK, do the following:
+
+    1. Add a Gradle target to Beam that can be used to build a shaded expansion service JAR for the target Java transform. This target should produce a Beam JAR that contains all dependencies needed for expanding the Java transform and the JAR should be released with Beam.
+    2. In your Python module, instantiate `BeamJarExpansionService` with the Gradle target.
+
+        {{< highlight >}}
+    expansion_service = BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
+        {{< /highlight >}}
+4. Add a Python wrapper transform class that extends [ExternalTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py). Pass the payload and expansion service defined above as parameters to the constructor of the `ExternalTransform` parent class.
+
+#### 13.1.2. Creating cross-language Python transforms
+
+To make your Python transform usable with different SDK languages, you must create a wrapper class that registers an existing Python transform as a cross-language transform for use with the Python expansion service and calls into that existing transform to perform its intended operation.
+
+**Defining the wrapper class**
+
+1. Define a Uniform Resource Name (URN) for your transform. The URN must be a unique string that identifies your transform with the expansion service.
+
+    {{< highlight >}}
+TEST_COMPK_URN = "beam:transforms:xlang:test:compk"
+    {{< /highlight >}}
+2. For an existing Python transform, create a new wrapper class to register the URN with the Python expansion service.
+
+    {{< highlight >}}
+@ptransform.PTransform.register_urn(TEST_COMPK_URN, None)
+class CombinePerKeyTransform(ptransform.PTransform):
+    {{< /highlight >}}
+3. From within the class, define an expand method that takes an input PCollection, runs the Python transform, and then returns the output PCollection.
+
+    {{< highlight >}}
+def expand(self, pcoll):
+    return pcoll \
+        | beam.CombinePerKey(sum).with_output_types(
+              typing.Tuple[unicode, int])
+    {{< /highlight >}}
+4. As with other Python transforms, define a `to_runner_api_parameter` method that returns the URN.
+
+    {{< highlight >}}
+def to_runner_api_parameter(self, unused_context):
+    return TEST_COMPK_URN, None
+    {{< /highlight >}}
+5. Define a static `from_runner_api_parameter` method that returns an instantiation of the cross-language Python transform.
+
+    {{< highlight >}}
+@staticmethod
+def from_runner_api_parameter(
+      unused_ptransform, unused_parameter, unused_context):
+    return CombinePerKeyTransform()
+    {{< /highlight >}}
+
+**Using the expansion service**
+
+Python has a default expansion service included and available in the Apache Beam SDK. You are free to write your own expansion service, but that is generally not needed, so it is not covered in this section.
+
+Perform the following steps to start up the default Python expansion service directly:
+
+1. Create a virtual environment and [install the Apache Beam SDK](https://beam.apache.org/get-started/quickstart-py/).
+2. Start the Python SDK’s expansion service with a specified port.
+
+    {{< highlight >}}
+$ export PORT_FOR_EXPANSION_SERVICE=12345
+$ python -m apache_beam.runners.portability.expansion_service -p $PORT_FOR_EXPANSION_SERVICE
+    {{< /highlight >}}
+
+**Including dependencies**
+
+If your transform requires external libraries, make sure those dependencies are already specified in a custom container for SDK harness. Core Apache Beam dependencies are already included.
+
+### 13.2. Using cross-language transforms {#use-x-lang-transforms}
+
+Depending on the SDK language of the pipeline, you can use a high-level SDK-wrapper class, or a low-level transform class to access a cross-language transform. 
+
+#### 13.2.1. Using cross-language transforms in a Java pipeline
+
+Currently, to access cross-language transforms from the Java SDK, you have to use the lower-level [External](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) class.
+
+**Using the External class**
+
+1. Make sure you have any runtime environment dependencies (like JRE) installed on your local machine (either directly on the local machine or available through a container). See the expansion service section for more details.
+
+    > **Note:** When including Python transforms from within a Java pipeline, all python dependencies have to be baked into the SDK harness container.
+2. Start up the expansion service for the SDK that is in the language of the transform you're trying to consume, if not available. 
+
+    Make sure the transform you are trying to use is available and can be used by the expansion service.
+3. Include [External.of(...)](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java) when instantiating your pipeline. Reference the URN, payload, and expansion service. For examples, see the [cross-language transform test suite](https://github.com/apache/beam/blob/master/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java).
+4. After the job has been submitted to the Beam runner, shutdown the expansion service by terminating the expansion service process.
+
+#### 13.2.2 Using cross-language transforms in a Python pipeline
+
+If a Python-specific wrapper for a cross-language transform is available, use that; otherwise, you have to use the lower-level `ExternalTransform` class to access the transform.

Review comment:
       Add a link to https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org