You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by me...@apache.org on 2017/11/06 21:13:01 UTC

[beam-site] branch mergebot updated (dafa288 -> 593691c)

This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a change to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git.


 discard dafa288  This closes #340
 discard 147e380  Add page for the portability framework
     new db37734  [BEAM-1934] Add more CoGroupByKey content/examples
     new 445fa90  Update with Java snippet tags
     new 593691c  This closes #302

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (dafa288)
            \
             N -- N -- N   refs/heads/mergebot (593691c)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/contribute/portability.md          | 167 ---------------------------------
 src/documentation/programming-guide.md | 145 +++++++++++++++++++++-------
 2 files changed, 111 insertions(+), 201 deletions(-)
 delete mode 100644 src/contribute/portability.md

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].

[beam-site] 02/03: Update with Java snippet tags

Posted by me...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 445fa90b4c6f6e61149602d71300743e48f8da4a
Author: melissa <me...@google.com>
AuthorDate: Fri Oct 27 17:49:13 2017 -0700

    Update with Java snippet tags
---
 src/documentation/programming-guide.md | 74 +++++-----------------------------
 1 file changed, 10 insertions(+), 64 deletions(-)

diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md
index c2f95ac..f746d7d 100644
--- a/src/documentation/programming-guide.md
+++ b/src/documentation/programming-guide.md
@@ -856,9 +856,9 @@ The following conceptual examples use two input collections to show the mechanic
 `CoGroupByKey`.
 
 <span class="language-java">
-The first set of data has a `TupleTag<String>` called `emailTag` and contains names
+The first set of data has a `TupleTag<String>` called `emailsTag` and contains names
 and email addresses. The second set of data has a `TupleTag<String>` called
-`phoneTag` and contains names and phone numbers.
+`phonesTag` and contains names and phone numbers.
 </span>
 <span class="language-py">
 The first set of data contains names and email addresses. The second set of
@@ -866,18 +866,8 @@ data contains names and phone numbers.
 </span>
 
 ```java
-// This set of data has a `TupleTag<String>` called `emailTag`.
-   "amy" -> "amy@example.com"
-   "carl" -> "carl@example.com"
-   "julia" -> "julia@example.com"
-   "carl" -> "carl@email.com"
-
-// This set of data has a `TupleTag<String>` called `phoneTag`.
-   "amy" -> "111-222-3333"
-   "james" -> "222-333-4444"
-   "amy" -> "333-444-5555"
-   "carl" -> "444-555-6666"
-```
+{% github_sample /apache/beam/blob/master/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java tag:CoGroupByKeyTupleInputs
+%}```
 ```py
 {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_group_by_key_cogroupbykey_tuple_inputs
 %}```
@@ -886,23 +876,8 @@ After `CoGroupByKey`, the resulting data contains all data associated with each
 unique key from any of the input collections.
 
 ```java
-   "amy" -> {
-      emailTag -> ["amy@example.com"]
-      phoneTag -> ["111-222-3333", "333-444-5555"]
-   }
-   "carl" -> {
-      emailTag -> ["carl@example.com", "carl@email.com"]
-      phoneTag -> ["444-555-6666"]
-   }
-   "james" -> {
-      emailTag -> [],
-      phoneTag -> ["222-333-4444"]
-   }
-   "julia" -> {
-      emailTag -> ["julia@example.com"],
-      phoneTag -> []
-   }
-```
+{% github_sample /apache/beam/blob/master/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java tag:CoGroupByKeyTupleOutputs
+%}```
 ```py
 {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_group_by_key_cogroupbykey_tuple_outputs
 %}```
@@ -912,37 +887,8 @@ followed by a `ParDo` to consume the result. Then, the code uses tags to look up
 and format data from each collection.
 
 ```java
-  // Each set of key-value pairs is read into separate PCollections.
-  // Each shares a common key ("K").
-  PCollection<KV<K, V1>> pt1 = ...;
-  PCollection<KV<K, V2>> pt2 = ...;
-
-  // Create tuple tags for the value types in each collection.
-  final TupleTag<V1> t1 = new TupleTag<V1>();
-  final TupleTag<V2> t2 = new TupleTag<V2>();
-
-  // Merge collection values into a CoGbkResult collection
-  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
-    KeyedPCollectionTuple.of(t1, pt1)
-                         .and(t2, pt2)
-                         .apply(CoGroupByKey.<K>create());
-
-  // Access results and do something with them.
-  PCollection<T> finalResultCollection =
-    coGbkResultCollection.apply(ParDo.of(
-      new DoFn<KV<K, CoGbkResult>, T>() {
-        @Override
-        public void processElement(ProcessContext c) {
-          KV<K, CoGbkResult> e = c.element();
-          // Get all collection 1 values
-          Iterable<V1> pt1Vals = e.getValue().getAll(t1);
-          // Get all collection 2 values
-          Iterable<V2> pt2Vals = e.getValue().getAll(t2);
-          // ... Do something ...
-          c.output(...some T...);
-        }
-      }));
-```
+{% github_sample /apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java tag:CoGroupByKeyTuple
+%}```
 ```py
 {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:model_group_by_key_cogroupbykey_tuple
 %}```
@@ -950,8 +896,8 @@ and format data from each collection.
 The formatted data looks like this:
 
 ```java
-  Sample coming soon.
-```
+{% github_sample /apache/beam/blob/master/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java tag:CoGroupByKeyTupleFormattedOutputs
+%}```
 ```py
 {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_group_by_key_cogroupbykey_tuple_formatted_outputs
 %}```

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam-site] 03/03: This closes #302

Posted by me...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 593691c265a5fd62efb7eb0bcf2ff272e27bd234
Merge: 0679868 445fa90
Author: Mergebot <me...@apache.org>
AuthorDate: Mon Nov 6 21:12:45 2017 +0000

    This closes #302

 src/documentation/programming-guide.md | 145 +++++++++++++++++++++++++--------
 1 file changed, 111 insertions(+), 34 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam-site] 01/03: [BEAM-1934] Add more CoGroupByKey content/examples

Posted by me...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit db377341a97d127802feeef1c9d1d981bc2f0fc5
Author: melissa <me...@google.com>
AuthorDate: Wed Aug 23 15:25:47 2017 -0700

    [BEAM-1934] Add more CoGroupByKey content/examples
---
 src/documentation/programming-guide.md | 193 +++++++++++++++++++++++++++------
 1 file changed, 162 insertions(+), 31 deletions(-)

diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md
index 2ccbd35..c2f95ac 100644
--- a/src/documentation/programming-guide.md
+++ b/src/documentation/programming-guide.md
@@ -785,45 +785,176 @@ tree, [2]
 Thus, `GroupByKey` represents a transform from a multimap (multiple keys to
 individual values) to a uni-map (unique keys to collections of values).
 
+##### 4.2.2.1 GroupByKey and unbounded PCollections
+
+If you are using unbounded `PCollection`s, you must use either [non-global
+windowing](#setting-your-pcollections-windowing-function) or an
+[aggregation trigger](#triggers) in order to perform a `GroupByKey` or
+[CoGroupByKey](#cogroupbykey). This is because a bounded `GroupByKey` or
+`CoGroupByKey` must wait for all the data with a certain key to be collected,
+but with unbounded collections, the data is unlimited. Windowing and/or triggers
+allow grouping to operate on logical, finite bundles of data within the
+unbounded data streams.
+
+If you do apply `GroupByKey` or `CoGroupByKey` to a group of unbounded
+`PCollection`s without setting either a non-global windowing strategy, a trigger
+strategy, or both for each collection, Beam generates an IllegalStateException
+error at pipeline construction time.
+
+When using `GroupByKey` or `CoGroupByKey` to group `PCollection`s that have a
+[windowing strategy](#windowing) applied, all of the `PCollection`s you want to
+group *must use the same windowing strategy* and window sizing. For example, all
+of the collections you are merging must use (hypothetically) identical 5-minute
+fixed windows, or 4-minute sliding windows starting every 30 seconds.
+
+If your pipeline attempts to use `GroupByKey` or `CoGroupByKey` to merge
+`PCollection`s with incompatible windows, Beam generates an
+IllegalStateException error at pipeline construction time.
+
 #### 4.2.3. CoGroupByKey
 
-`CoGroupByKey` joins two or more key/value `PCollection`s that have the same key
-type, and then emits a collection of `KV<K, CoGbkResult>` pairs. [Design Your
-Pipeline]({{ site.baseurl }}/documentation/pipelines/design-your-pipeline/#multiple-sources)
+`CoGroupByKey` performs a relational join of two or more key/value
+`PCollection`s that have the same key type.
+[Design Your Pipeline]({{ site.baseurl }}/documentation/pipelines/design-your-pipeline/#multiple-sources)
 shows an example pipeline that uses a join.
 
-Given the input collections below:
+Consider using `CoGroupByKey` if you have multiple data sets that provide
+information about related things. For example, let's say you have two different
+files with user data: one file has names and email addresses; the other file
+has names and phone numbers. You can join those two data sets, using the user
+name as a common key and the other data as the associated values. After the
+join, you have one data set that contains all of the information (email
+addresses and phone numbers) associated with each name.
+
+If you are using unbounded `PCollection`s, you must use either [non-global
+windowing](#setting-your-pcollections-windowing-function) or an
+[aggregation trigger](#triggers) in order to perform a `CoGroupByKey`. See
+[GroupByKey and unbounded PCollections](#groupbykey-and-unbounded-pcollections)
+for more details.
+
+<span class="language-java">
+In the Beam SDK for Java, `CoGroupByKey` accepts a tuple of keyed
+`PCollection`s (`PCollection<KV<K, V>>`) as input. For type safety, the SDK
+requires you to pass each `PCollection` as part of a `KeyedPCollectionTuple`.
+You must declare a `TupleTag` for each input `PCollection` in the
+`KeyedPCollectionTuple` that you want to pass to `CoGroupByKey`. As output,
+`CoGroupByKey` returns a `PCollection<KV<K, CoGbkResult>>`, which groups values
+from all the input `PCollection`s by their common keys. Each key (all of type
+`K`) will have a different `CoGbkResult`, which is a map from `TupleTag<T>` to
+`Iterable<T>`. You can access a specific collection in an `CoGbkResult` object
+by using the `TupleTag` that you supplied with the initial collection.
+</span>
+<span class="language-py">
+In the Beam SDK for Python, `CoGroupByKey` accepts a dictionary of keyed
+`PCollection`s as input. As output, `CoGroupByKey` creates a single output
+`PCollection` that contains one key/value tuple for each key in the input
+`PCollection`s. Each key's value is a dictionary that maps each tag to an
+iterable of the values under they key in the corresponding `PCollection`.
+</span>
+
+The following conceptual examples use two input collections to show the mechanics of
+`CoGroupByKey`.
+
+<span class="language-java">
+The first set of data has a `TupleTag<String>` called `emailTag` and contains names
+and email addresses. The second set of data has a `TupleTag<String>` called
+`phoneTag` and contains names and phone numbers.
+</span>
+<span class="language-py">
+The first set of data contains names and email addresses. The second set of
+data contains names and phone numbers.
+</span>
+
+```java
+// This set of data has a `TupleTag<String>` called `emailTag`.
+   "amy" -> "amy@example.com"
+   "carl" -> "carl@example.com"
+   "julia" -> "julia@example.com"
+   "carl" -> "carl@email.com"
+
+// This set of data has a `TupleTag<String>` called `phoneTag`.
+   "amy" -> "111-222-3333"
+   "james" -> "222-333-4444"
+   "amy" -> "333-444-5555"
+   "carl" -> "444-555-6666"
 ```
-// collection 1
-user1, address1
-user2, address2
-user3, address3
+```py
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_group_by_key_cogroupbykey_tuple_inputs
+%}```
 
-// collection 2
-user1, order1
-user1, order2
-user2, order3
-guest, order4
-...
+After `CoGroupByKey`, the resulting data contains all data associated with each
+unique key from any of the input collections.
+
+```java
+   "amy" -> {
+      emailTag -> ["amy@example.com"]
+      phoneTag -> ["111-222-3333", "333-444-5555"]
+   }
+   "carl" -> {
+      emailTag -> ["carl@example.com", "carl@email.com"]
+      phoneTag -> ["444-555-6666"]
+   }
+   "james" -> {
+      emailTag -> [],
+      phoneTag -> ["222-333-4444"]
+   }
+   "julia" -> {
+      emailTag -> ["julia@example.com"],
+      phoneTag -> []
+   }
 ```
+```py
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_group_by_key_cogroupbykey_tuple_outputs
+%}```
 
-`CoGroupByKey` gathers up the values with the same key from all `PCollection`s,
-and outputs a new pair consisting of the unique key and an object `CoGbkResult`
-containing all values that were associated with that key. If you apply
-`CoGroupByKey` to the input collections above, the output collection would look
-like this:
+The following code example joins the two `PCollection`s with `CoGroupByKey`,
+followed by a `ParDo` to consume the result. Then, the code uses tags to look up
+and format data from each collection.
+
+```java
+  // Each set of key-value pairs is read into separate PCollections.
+  // Each shares a common key ("K").
+  PCollection<KV<K, V1>> pt1 = ...;
+  PCollection<KV<K, V2>> pt2 = ...;
+
+  // Create tuple tags for the value types in each collection.
+  final TupleTag<V1> t1 = new TupleTag<V1>();
+  final TupleTag<V2> t2 = new TupleTag<V2>();
+
+  // Merge collection values into a CoGbkResult collection
+  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+    KeyedPCollectionTuple.of(t1, pt1)
+                         .and(t2, pt2)
+                         .apply(CoGroupByKey.<K>create());
+
+  // Access results and do something with them.
+  PCollection<T> finalResultCollection =
+    coGbkResultCollection.apply(ParDo.of(
+      new DoFn<KV<K, CoGbkResult>, T>() {
+        @Override
+        public void processElement(ProcessContext c) {
+          KV<K, CoGbkResult> e = c.element();
+          // Get all collection 1 values
+          Iterable<V1> pt1Vals = e.getValue().getAll(t1);
+          // Get all collection 2 values
+          Iterable<V2> pt2Vals = e.getValue().getAll(t2);
+          // ... Do something ...
+          c.output(...some T...);
+        }
+      }));
 ```
-user1, [[address1], [order1, order2]]
-user2, [[address2], [order3]]
-user3, [[address3], []]
-guest, [[], [order4]]
-...
-````
+```py
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:model_group_by_key_cogroupbykey_tuple
+%}```
 
-> **A Note on Key/Value Pairs:** Beam represents key/value pairs slightly
-> differently depending on the language and SDK you're using. In the Beam SDK
-> for Java, you represent a key/value pair with an object of type `KV<K, V>`. In
-> Python, you represent key/value pairs with 2-tuples.
+The formatted data looks like this:
+
+```java
+  Sample coming soon.
+```
+```py
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_group_by_key_cogroupbykey_tuple_formatted_outputs
+%}```
 
 #### 4.2.4. Combine
 
@@ -1078,7 +1209,7 @@ PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
 
 ```py
 # Flatten takes a tuple of PCollection objects.
-# Returns a single PCollection that contains all of the elements in the
+# Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.
 {%
 github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:model_multiple_pcollections_flatten
 %}
@@ -1998,7 +2129,7 @@ windows are not actually used until they're needed for the `GroupByKey`.
 Subsequent transforms, however, are applied to the result of the `GroupByKey` --
 data is grouped by both key and window.
 
-#### 7.1.2. Using windowing with bounded PCollections
+#### 7.1.2. Windowing with bounded PCollections
 
 You can use windowing with fixed-size data sets in **bounded** `PCollection`s.
 However, note that windowing considers only the implicit timestamps attached to

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.