You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by me...@apache.org on 2017/11/06 21:17:24 UTC

[beam-site] branch asf-site updated (0679868 -> 6dac6ca)

This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git.


    from 0679868  This closes #332: New web site navigation
     add db37734  [BEAM-1934] Add more CoGroupByKey content/examples
     add 445fa90  Update with Java snippet tags
     add 593691c  This closes #302
     new 6dac6ca  Prepare repository for deployment.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 content/documentation/programming-guide/index.html | 243 ++++++++++++++++++---
 src/documentation/programming-guide.md             | 145 +++++++++---
 2 files changed, 319 insertions(+), 69 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].

[beam-site] 01/01: Prepare repository for deployment.

Posted by me...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 6dac6ca7b63b309952130dee69ba878bb68f4a73
Author: Mergebot <me...@apache.org>
AuthorDate: Mon Nov 6 21:17:22 2017 +0000

    Prepare repository for deployment.
---
 content/documentation/programming-guide/index.html | 243 ++++++++++++++++++---
 1 file changed, 208 insertions(+), 35 deletions(-)

diff --git a/content/documentation/programming-guide/index.html b/content/documentation/programming-guide/index.html
index b00ea56..6df1d91 100644
--- a/content/documentation/programming-guide/index.html
+++ b/content/documentation/programming-guide/index.html
@@ -303,7 +303,10 @@ how to implement Beam concepts in your pipelines.</p>
               <li><a href="#lightweight-dofns-and-other-abstractions" id="markdown-toc-lightweight-dofns-and-other-abstractions">4.2.1.3. Lightweight DoFns and other abstractions</a></li>
             </ul>
           </li>
-          <li><a href="#groupbykey" id="markdown-toc-groupbykey">4.2.2. GroupByKey</a></li>
+          <li><a href="#groupbykey" id="markdown-toc-groupbykey">4.2.2. GroupByKey</a>            <ul>
+              <li><a href="#groupbykey-and-unbounded-pcollections" id="markdown-toc-groupbykey-and-unbounded-pcollections">4.2.2.1 GroupByKey and unbounded PCollections</a></li>
+            </ul>
+          </li>
           <li><a href="#cogroupbykey" id="markdown-toc-cogroupbykey">4.2.3. CoGroupByKey</a></li>
           <li><a href="#combine" id="markdown-toc-combine">4.2.4. Combine</a>            <ul>
               <li><a href="#simple-combinations-using-simple-functions" id="markdown-toc-simple-combinations-using-simple-functions">4.2.4.1. Simple combinations using simple functions</a></li>
@@ -370,7 +373,7 @@ how to implement Beam concepts in your pipelines.</p>
   <li><a href="#windowing" id="markdown-toc-windowing">7. Windowing</a>    <ul>
       <li><a href="#windowing-basics" id="markdown-toc-windowing-basics">7.1. Windowing basics</a>        <ul>
           <li><a href="#windowing-constraints" id="markdown-toc-windowing-constraints">7.1.1. Windowing constraints</a></li>
-          <li><a href="#using-windowing-with-bounded-pcollections" id="markdown-toc-using-windowing-with-bounded-pcollections">7.1.2. Using windowing with bounded PCollections</a></li>
+          <li><a href="#windowing-with-bounded-pcollections" id="markdown-toc-windowing-with-bounded-pcollections">7.1.2. Windowing with bounded PCollections</a></li>
         </ul>
       </li>
       <li><a href="#provided-windowing-functions" id="markdown-toc-provided-windowing-functions">7.2. Provided windowing functions</a>        <ul>
@@ -1224,47 +1227,217 @@ tree, [2]
 <p>Thus, <code class="highlighter-rouge">GroupByKey</code> represents a transform from a multimap (multiple keys to
 individual values) to a uni-map (unique keys to collections of values).</p>
 
+<h5 id="groupbykey-and-unbounded-pcollections">4.2.2.1 GroupByKey and unbounded PCollections</h5>
+
+<p>If you are using unbounded <code class="highlighter-rouge">PCollection</code>s, you must use either <a href="#setting-your-pcollections-windowing-function">non-global
+windowing</a> or an
+<a href="#triggers">aggregation trigger</a> in order to perform a <code class="highlighter-rouge">GroupByKey</code> or
+<a href="#cogroupbykey">CoGroupByKey</a>. This is because a bounded <code class="highlighter-rouge">GroupByKey</code> or
+<code class="highlighter-rouge">CoGroupByKey</code> must wait for all the data with a certain key to be collected,
+but with unbounded collections, the data is unlimited. Windowing and/or triggers
+allow grouping to operate on logical, finite bundles of data within the
+unbounded data streams.</p>
+
+<p>If you do apply <code class="highlighter-rouge">GroupByKey</code> or <code class="highlighter-rouge">CoGroupByKey</code> to a group of unbounded
+<code class="highlighter-rouge">PCollection</code>s without setting either a non-global windowing strategy, a trigger
+strategy, or both for each collection, Beam generates an IllegalStateException
+error at pipeline construction time.</p>
+
+<p>When using <code class="highlighter-rouge">GroupByKey</code> or <code class="highlighter-rouge">CoGroupByKey</code> to group <code class="highlighter-rouge">PCollection</code>s that have a
+<a href="#windowing">windowing strategy</a> applied, all of the <code class="highlighter-rouge">PCollection</code>s you want to
+group <em>must use the same windowing strategy</em> and window sizing. For example, all
+of the collections you are merging must use (hypothetically) identical 5-minute
+fixed windows, or 4-minute sliding windows starting every 30 seconds.</p>
+
+<p>If your pipeline attempts to use <code class="highlighter-rouge">GroupByKey</code> or <code class="highlighter-rouge">CoGroupByKey</code> to merge
+<code class="highlighter-rouge">PCollection</code>s with incompatible windows, Beam generates an
+IllegalStateException error at pipeline construction time.</p>
+
 <h4 id="cogroupbykey">4.2.3. CoGroupByKey</h4>
 
-<p><code class="highlighter-rouge">CoGroupByKey</code> joins two or more key/value <code class="highlighter-rouge">PCollection</code>s that have the same key
-type, and then emits a collection of <code class="highlighter-rouge">KV&lt;K, CoGbkResult&gt;</code> pairs. <a href="/documentation/pipelines/design-your-pipeline/#multiple-sources">Design Your
-Pipeline</a>
+<p><code class="highlighter-rouge">CoGroupByKey</code> performs a relational join of two or more key/value
+<code class="highlighter-rouge">PCollection</code>s that have the same key type.
+<a href="/documentation/pipelines/design-your-pipeline/#multiple-sources">Design Your Pipeline</a>
 shows an example pipeline that uses a join.</p>
 
-<p>Given the input collections below:</p>
-<div class="highlighter-rouge"><pre class="highlight"><code>// collection 1
-user1, address1
-user2, address2
-user3, address3
-
-// collection 2
-user1, order1
-user1, order2
-user2, order3
-guest, order4
-...
+<p>Consider using <code class="highlighter-rouge">CoGroupByKey</code> if you have multiple data sets that provide
+information about related things. For example, let’s say you have two different
+files with user data: one file has names and email addresses; the other file
+has names and phone numbers. You can join those two data sets, using the user
+name as a common key and the other data as the associated values. After the
+join, you have one data set that contains all of the information (email
+addresses and phone numbers) associated with each name.</p>
+
+<p>If you are using unbounded <code class="highlighter-rouge">PCollection</code>s, you must use either <a href="#setting-your-pcollections-windowing-function">non-global
+windowing</a> or an
+<a href="#triggers">aggregation trigger</a> in order to perform a <code class="highlighter-rouge">CoGroupByKey</code>. See
+<a href="#groupbykey-and-unbounded-pcollections">GroupByKey and unbounded PCollections</a>
+for more details.</p>
+
+<p><span class="language-java">
+In the Beam SDK for Java, <code class="highlighter-rouge">CoGroupByKey</code> accepts a tuple of keyed
+<code class="highlighter-rouge">PCollection</code>s (<code class="highlighter-rouge">PCollection&lt;KV&lt;K, V&gt;&gt;</code>) as input. For type safety, the SDK
+requires you to pass each <code class="highlighter-rouge">PCollection</code> as part of a <code class="highlighter-rouge">KeyedPCollectionTuple</code>.
+You must declare a <code class="highlighter-rouge">TupleTag</code> for each input <code class="highlighter-rouge">PCollection</code> in the
+<code class="highlighter-rouge">KeyedPCollectionTuple</code> that you want to pass to <code class="highlighter-rouge">CoGroupByKey</code>. As output,
+<code class="highlighter-rouge">CoGroupByKey</code> returns a <code class="highlighter-rouge">PCollection&lt;KV&lt;K, CoGbkResult&gt;&gt;</code>, which groups values
+from all the input <code class="highlighter-rouge">PCollection</code>s by their common keys. Each key (all of type
+<code class="highlighter-rouge">K</code>) will have a different <code class="highlighter-rouge">CoGbkResult</code>, which is a map from <code class="highlighter-rouge">TupleTag&lt;T&gt;</code> to
+<code class="highlighter-rouge">Iterable&lt;T&gt;</code>. You can access a specific collection in an <code class="highlighter-rouge">CoGbkResult</code> object
+by using the <code class="highlighter-rouge">TupleTag</code> that you supplied with the initial collection.
+</span>
+<span class="language-py">
+In the Beam SDK for Python, <code class="highlighter-rouge">CoGroupByKey</code> accepts a dictionary of keyed
+<code class="highlighter-rouge">PCollection</code>s as input. As output, <code class="highlighter-rouge">CoGroupByKey</code> creates a single output
+<code class="highlighter-rouge">PCollection</code> that contains one key/value tuple for each key in the input
+<code class="highlighter-rouge">PCollection</code>s. Each key’s value is a dictionary that maps each tag to an
+iterable of the values under they key in the corresponding <code class="highlighter-rouge">PCollection</code>.
+</span></p>
+
+<p>The following conceptual examples use two input collections to show the mechanics of
+<code class="highlighter-rouge">CoGroupByKey</code>.</p>
+
+<p><span class="language-java">
+The first set of data has a <code class="highlighter-rouge">TupleTag&lt;String&gt;</code> called <code class="highlighter-rouge">emailsTag</code> and contains names
+and email addresses. The second set of data has a <code class="highlighter-rouge">TupleTag&lt;String&gt;</code> called
+<code class="highlighter-rouge">phonesTag</code> and contains names and phone numbers.
+</span>
+<span class="language-py">
+The first set of data contains names and email addresses. The second set of
+data contains names and phone numbers.
+</span></p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">final</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">emailsList</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span>
+    <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"amy"</span><span class="o">,</span> <span class="s">"amy@example.com"</span><span class="o">),</span>
+    <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"carl"</span><span class="o">,</span> <span class="s">"carl@example.com"</span><span class="o">),</span>
+    <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"julia"</span><span class="o">,</span> <span class="s">"julia@example.com"</span><span class="o">),</span>
+    <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"carl"</span><span class="o">,</span> <span class="s">"carl@email.com"</span><span class="o">));</span>
+
+<span class="kd">final</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">phonesList</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span>
+    <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"amy"</span><span class="o">,</span> <span class="s">"111-222-3333"</span><span class="o">),</span>
+    <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"james"</span><span class="o">,</span> <span class="s">"222-333-4444"</span><span class="o">),</span>
+    <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"amy"</span><span class="o">,</span> <span class="s">"333-444-5555"</span><span class="o">),</span>
+    <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"carl"</span><span class="o">,</span> <span class="s">"444-555-6666"</span><span class="o">));</span>
+
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">emails</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"CreateEmails"</span><span class="o">,</span> <span class="n">Create</span><span class="o">. [...]
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">phones</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"CreatePhones"</span><span class="o">,</span> <span class="n">Create</span><span class="o">. [...]
+</code></pre>
+</div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">emails_list</span> <span class="o">=</span> <span class="p">[</span>
+    <span class="p">(</span><span class="s">'amy'</span><span class="p">,</span> <span class="s">'amy@example.com'</span><span class="p">),</span>
+    <span class="p">(</span><span class="s">'carl'</span><span class="p">,</span> <span class="s">'carl@example.com'</span><span class="p">),</span>
+    <span class="p">(</span><span class="s">'julia'</span><span class="p">,</span> <span class="s">'julia@example.com'</span><span class="p">),</span>
+    <span class="p">(</span><span class="s">'carl'</span><span class="p">,</span> <span class="s">'carl@email.com'</span><span class="p">),</span>
+<span class="p">]</span>
+<span class="n">phones_list</span> <span class="o">=</span> <span class="p">[</span>
+    <span class="p">(</span><span class="s">'amy'</span><span class="p">,</span> <span class="s">'111-222-3333'</span><span class="p">),</span>
+    <span class="p">(</span><span class="s">'james'</span><span class="p">,</span> <span class="s">'222-333-4444'</span><span class="p">),</span>
+    <span class="p">(</span><span class="s">'amy'</span><span class="p">,</span> <span class="s">'333-444-5555'</span><span class="p">),</span>
+    <span class="p">(</span><span class="s">'carl'</span><span class="p">,</span> <span class="s">'444-555-6666'</span><span class="p">),</span>
+<span class="p">]</span>
+
+<span class="n">emails</span> <span class="o">=</span> <span class="n">p</span> <span class="o">|</span> <span class="s">'CreateEmails'</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">(</span><span class="n">emails_list</span><span class="p">)</span>
+<span class="n">phones</span> <span class="o">=</span> <span class="n">p</span> <span class="o">|</span> <span class="s">'CreatePhones'</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">(</span><span class="n">phones_list</span><span class="p">)</span>
+</code></pre>
+</div>
+
+<p>After <code class="highlighter-rouge">CoGroupByKey</code>, the resulting data contains all data associated with each
+unique key from any of the input collections.</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">final</span> <span class="n">TupleTag</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">emailsTag</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TupleTag</span><span class="o">();</span>
+<span class="kd">final</span> <span class="n">TupleTag</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">phonesTag</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TupleTag</span><span class="o">();</span>
+
+<span class="kd">final</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">CoGbkResult</span><span class="o">&gt;&gt;</span> <span class="n">expectedResults</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span>
+    <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"amy"</span><span class="o">,</span> <span class="n">CoGbkResult</span>
+      <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"amy@example.com"</span><span class="o">))</span>
+      <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"111-222-3333"</span><span class="o">,</span> <span class="s">"333-444-5555"</span><span class="o">))),</span>
+    <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"carl"</span><span class="o">,</span> <span class="n">CoGbkResult</span>
+      <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"carl@email.com"</span><span class="o">,</span> <span class="s">"carl@example.com"</span><span class="o">))</span>
+      <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"444-555-6666"</span><span class="o">))),</span>
+    <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"james"</span><span class="o">,</span> <span class="n">CoGbkResult</span>
+      <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">())</span>
+      <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"222-333-4444"</span><span class="o">))),</span>
+    <span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"julia"</span><span class="o">,</span> <span class="n">CoGbkResult</span>
+      <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"julia@example.com"</span><span class="o">))</span>
+      <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">())));</span>
+</code></pre>
+</div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">results</span> <span class="o">=</span> <span class="p">[</span>
+    <span class="p">(</span><span class="s">'amy'</span><span class="p">,</span> <span class="p">{</span>
+        <span class="s">'emails'</span><span class="p">:</span> <span class="p">[</span><span class="s">'amy@example.com'</span><span class="p">],</span>
+        <span class="s">'phones'</span><span class="p">:</span> <span class="p">[</span><span class="s">'111-222-3333'</span><span class="p">,</span> <span class="s">'333-444-5555'</span><span class="p">]}),</span>
+    <span class="p">(</span><span class="s">'carl'</span><span class="p">,</span> <span class="p">{</span>
+        <span class="s">'emails'</span><span class="p">:</span> <span class="p">[</span><span class="s">'carl@email.com'</span><span class="p">,</span> <span class="s">'carl@example.com'</span><span class="p">],</span>
+        <span class="s">'phones'</span><span class="p">:</span> <span class="p">[</span><span class="s">'444-555-6666'</span><span class="p">]}),</span>
+    <span class="p">(</span><span class="s">'james'</span><span class="p">,</span> <span class="p">{</span>
+        <span class="s">'emails'</span><span class="p">:</span> <span class="p">[],</span>
+        <span class="s">'phones'</span><span class="p">:</span> <span class="p">[</span><span class="s">'222-333-4444'</span><span class="p">]}),</span>
+    <span class="p">(</span><span class="s">'julia'</span><span class="p">,</span> <span class="p">{</span>
+        <span class="s">'emails'</span><span class="p">:</span> <span class="p">[</span><span class="s">'julia@example.com'</span><span class="p">],</span>
+        <span class="s">'phones'</span><span class="p">:</span> <span class="p">[]}),</span>
+<span class="p">]</span>
+</code></pre>
+</div>
+
+<p>The following code example joins the two <code class="highlighter-rouge">PCollection</code>s with <code class="highlighter-rouge">CoGroupByKey</code>,
+followed by a <code class="highlighter-rouge">ParDo</code> to consume the result. Then, the code uses tags to look up
+and format data from each collection.</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">CoGbkResult</span><span class="o">&gt;&gt;</span> <span class="n">results</span> <span class="o">=</span>
+    <span class="n">KeyedPCollectionTuple</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">,</span> <span class="n">emails</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">,</span> <span class="n">phones</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">CoGroupByKey</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">create</span><span class="o">());</span>
+
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">contactLines</span> <span class="o">=</span> <span class="n">results</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span class="o">(</span>
+  <span class="k">new</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">CoGbkResult</span><span class="o">&gt;,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+    <span class="nd">@ProcessElement</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span><span class="n">ProcessContext</span> <span class="n">c</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">CoGbkResult</span><span class="o">&gt;</span> <span class="n">e</span> <span class="o">=</span> <span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">();</span>
+      <span class="n">String</span> <span class="n">name</span> <span class="o">=</span> <span class="n">e</span><span class="o">.</span><span class="na">getKey</span><span class="o">();</span>
+      <span class="n">Iterable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">emailsIter</span> <span class="o">=</span> <span class="n">e</span><span class="o">.</span><span class="na">getValue</span><span class="o">().</span><span class="na">getAll</span><span class="o">(</span><span class="n">emailsTag</span><span class="o">);</span>
+      <span class="n">Iterable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">phonesIter</span> <span class="o">=</span> <span class="n">e</span><span class="o">.</span><span class="na">getValue</span><span class="o">().</span><span class="na">getAll</span><span class="o">(</span><span class="n">phonesTag</span><span class="o">);</span>
+      <span class="n">String</span> <span class="n">formattedResult</span> <span class="o">=</span> <span class="n">Snippets</span><span class="o">.</span><span class="na">formatCoGbkResults</span><span class="o">(</span><span class="n">name</span><span class="o">,</span> <span class="n">emailsIter</span><span class="o">,</span> <span class="n">phonesIter</span><span class="o">);</span>
+      <span class="n">c</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">formattedResult</span><span class="o">);</span>
+    <span class="o">}</span>
+  <span class="o">}</span>
+<span class="o">));</span>
 </code></pre>
 </div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># The result PCollection contains one key-value element for each key in the</span>
+<span class="c"># input PCollections. The key of the pair will be the key from the input and</span>
+<span class="c"># the value will be a dictionary with two entries: 'emails' - an iterable of</span>
+<span class="c"># all values for the current key in the emails PCollection and 'phones': an</span>
+<span class="c"># iterable of all values for the current key in the phones PCollection.</span>
+<span class="n">results</span> <span class="o">=</span> <span class="p">({</span><span class="s">'emails'</span><span class="p">:</span> <span class="n">emails</span><span class="p">,</span> <span class="s">'phones'</span><span class="p">:</span> <span class="n">phones</span><span class="p">}</span>
+           <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CoGroupByKey</span><span class="p">())</span>
 
-<p><code class="highlighter-rouge">CoGroupByKey</code> gathers up the values with the same key from all <code class="highlighter-rouge">PCollection</code>s,
-and outputs a new pair consisting of the unique key and an object <code class="highlighter-rouge">CoGbkResult</code>
-containing all values that were associated with that key. If you apply
-<code class="highlighter-rouge">CoGroupByKey</code> to the input collections above, the output collection would look
-like this:</p>
-<div class="highlighter-rouge"><pre class="highlight"><code>user1, [[address1], [order1, order2]]
-user2, [[address2], [order3]]
-user3, [[address3], []]
-guest, [[], [order4]]
-...
+<span class="k">def</span> <span class="nf">join_info</span><span class="p">(</span><span class="n">name_info</span><span class="p">):</span>
+  <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">info</span><span class="p">)</span> <span class="o">=</span> <span class="n">name_info</span>
+  <span class="k">return</span> <span class="s">'</span><span class="si">%</span><span class="s">s; </span><span class="si">%</span><span class="s">s; </span><span class="si">%</span><span class="s">s'</span> <span class="o">%</span>\
+      <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="nb">sorted</span><span class="p">(</span><span class="n">info</span><span class="p">[</span><span class="s">'emails'</span><span class="p">]),</span> <span class="nb">sorted</span><span class="p">(</span><span class="n">info</span><span class="p">[</span><span class="s">'phones'</span><span class="p">]))</span>
+
+<span class="n">contact_lines</span> <span class="o">=</span> <span class="n">results</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="n">join_info</span><span class="p">)</span>
 </code></pre>
 </div>
 
-<blockquote>
-  <p><strong>A Note on Key/Value Pairs:</strong> Beam represents key/value pairs slightly
-differently depending on the language and SDK you’re using. In the Beam SDK
-for Java, you represent a key/value pair with an object of type <code class="highlighter-rouge">KV&lt;K, V&gt;</code>. In
-Python, you represent key/value pairs with 2-tuples.</p>
-</blockquote>
+<p>The formatted data looks like this:</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">final</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">formattedResults</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span>
+    <span class="s">"amy; ['amy@example.com']; ['111-222-3333', '333-444-5555']"</span><span class="o">,</span>
+    <span class="s">"carl; ['carl@email.com', 'carl@example.com']; ['444-555-6666']"</span><span class="o">,</span>
+    <span class="s">"james; []; ['222-333-4444']"</span><span class="o">,</span>
+    <span class="s">"julia; ['julia@example.com']; []"</span><span class="o">);</span>
+</code></pre>
+</div>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">formatted_results</span> <span class="o">=</span> <span class="p">[</span>
+    <span class="s">"amy; ['amy@example.com']; ['111-222-3333', '333-444-5555']"</span><span class="p">,</span>
+    <span class="s">"carl; ['carl@email.com', 'carl@example.com']; ['444-555-6666']"</span><span class="p">,</span>
+    <span class="s">"james; []; ['222-333-4444']"</span><span class="p">,</span>
+    <span class="s">"julia; ['julia@example.com']; []"</span><span class="p">,</span>
+<span class="p">]</span>
+</code></pre>
+</div>
 
 <h4 id="combine">4.2.4. Combine</h4>
 
@@ -1548,7 +1721,7 @@ is a Beam transform for <code class="highlighter-rouge">PCollection</code> objec
 </div>
 
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># Flatten takes a tuple of PCollection objects.</span>
-<span class="c"># Returns a single PCollection that contains all of the elements in the</span>
+<span class="c"># Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.</span>
 <span class="n">merged</span> <span class="o">=</span> <span class="p">(</span>
     <span class="p">(</span><span class="n">pcoll1</span><span class="p">,</span> <span class="n">pcoll2</span><span class="p">,</span> <span class="n">pcoll3</span><span class="p">)</span>
     <span class="c"># A list of tuples can be "piped" directly into a Flatten transform.</span>
@@ -2549,7 +2722,7 @@ windows are not actually used until they’re needed for the <code class="highli
 Subsequent transforms, however, are applied to the result of the <code class="highlighter-rouge">GroupByKey</code> –
 data is grouped by both key and window.</p>
 
-<h4 id="using-windowing-with-bounded-pcollections">7.1.2. Using windowing with bounded PCollections</h4>
+<h4 id="windowing-with-bounded-pcollections">7.1.2. Windowing with bounded PCollections</h4>
 
 <p>You can use windowing with fixed-size data sets in <strong>bounded</strong> <code class="highlighter-rouge">PCollection</code>s.
 However, note that windowing considers only the implicit timestamps attached to

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.