You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "kerrydc (via GitHub)" <gi...@apache.org> on 2023/03/08 15:47:50 UTC

[GitHub] [beam] kerrydc commented on a diff in pull request #23577: [Tour of Beam] Learning content for "Common-transforms" module

kerrydc commented on code in PR #23577:
URL: https://github.com/apache/beam/pull/23577#discussion_r1128107796


##########
learning/tour-of-beam/learning-content/common-transforms/aggregation/count/description.md:
##########
@@ -0,0 +1,311 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Count
+
+`Count` provides many transformations for calculating the count of values in a `PCollection`, either globally or for each key.
+
+{{if (eq .Sdk "go")}}
+Counts the number of elements within each aggregation. The Count transform has two varieties:
+
+You can count the number of elements in a `PCollection` with `CountElms()`, it will return one element.
+
+```
+import (
+    "github.com/apache/beam/sdks/go/pkg/beam"
+    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+    return stats.CountElms(s, input)
+}
+```
+
+You can use `Count()` to count how many elements are associated with a particular key. The result will be one output for each key.
+
+```
+import (
+    "github.com/apache/beam/sdks/go/pkg/beam"
+    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+    return stats.Count(s, input)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+Counts the number of elements within each aggregation. The Count transform has three varieties:
+
+### Counting all elements in a PCollection
+
+`Count.globally()` counts the number of elements in the entire `PCollection`. The result is a collection with a single element.
+
+```
+PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+PCollection<Long> output = input.apply(Count.globally());
+```
+
+Output
+```
+10
+```
+
+### Counting elements for each key
+
+`Count.perKey()` counts how many elements are associated with each key. It ignores the values. The resulting collection has one output for every key in the input collection.
+
+```
+PCollection<KV<String, Integer>> input = pipeline.apply(
+    Create.of(KV.of("πŸ₯•", 3),
+              KV.of("πŸ₯•", 2),
+              KV.of("πŸ†", 1),
+              KV.of("πŸ…", 4),
+              KV.of("πŸ…", 5),
+              KV.of("πŸ…", 3)));
+PCollection<KV<String, Long>> output = input.apply(Count.perKey());
+```
+
+Output
+
+```
+KV{πŸ₯•, 2}
+KV{πŸ…, 3}
+KV{πŸ†, 1}
+```
+
+### Counting all unique elements
+
+`Count.perElement()` counts how many times each element appears in the input collection. The output collection is a key-value pair, containing each unique element and the number of times it appeared in the original collection.
+
+```
+PCollection<KV<String, Integer>> input = pipeline.apply(
+    Create.of(KV.of("πŸ₯•", 3),
+              KV.of("πŸ₯•", 2),
+              KV.of("πŸ†", 1),
+              KV.of("πŸ…", 3),
+              KV.of("πŸ…", 5),
+              KV.of("πŸ…", 3)));
+PCollection<KV<String, Long>> output = input.apply(Count.perElement());
+```
+
+Output
+
+```
+KV{KV{πŸ…, 3}, 2}
+KV{KV{πŸ₯•, 2}, 1}
+KV{KV{πŸ†, 1}, 1}
+KV{KV{πŸ₯•, 3}, 1}
+KV{KV{πŸ…, 5}, 1}
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+### Counting all elements in a PCollection
+
+You can use `Count.Globally()` to count all elements in a PCollection, even if there are duplicate elements.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  total_elements = (
+      p
+      | 'Create plants' >> beam.Create(
+          ['πŸ“', 'πŸ₯•', 'πŸ₯•', 'πŸ₯•', 'πŸ†', 'πŸ†', 'πŸ…', 'πŸ…', 'πŸ…', '🌽'])
+      | 'Count all elements' >> beam.combiners.Count.Globally()
+      | beam.Map(print))
+```
+
+Output
+
+```
+10
+```
+
+### Counting elements for each key
+
+You can use `Count.PerKey()` to count the elements for each unique key in a PCollection of key-values.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  total_elements_per_keys = (
+      p

Review Comment:
   p | [first transform]
   on one line 



##########
learning/tour-of-beam/learning-content/common-transforms/aggregation/mean/description.md:
##########
@@ -0,0 +1,141 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Mean
+
+You can use Mean transforms to compute the arithmetic mean of the elements in a collection or the mean of the values associated with each key in a collection of key-value pairs.
+{{if (eq .Sdk "go")}}
+`Mean()` returns a transformation that returns a collection whose content is the average of the elements of the input collection. If there are no elements in the input collection, 0 is returned.
+
+```
+import (
+  "github.com/apache/beam/sdks/go/pkg/beam"
+  "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+

Review Comment:
   create input and add the expected output, similar to java



##########
learning/tour-of-beam/learning-content/common-transforms/aggregation/min/description.md:
##########
@@ -0,0 +1,190 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Min
+
+Min transforms find the minimum values globally or for each key in the input collection.
+{{if (eq .Sdk "go")}}
+You can find the global minimum value from the `PCollection` by using `Min()`
+
+```
+import (
+  "github.com/apache/beam/sdks/go/pkg/beam"
+  "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+

Review Comment:
   create data



##########
learning/tour-of-beam/learning-content/common-transforms/filter/description.md:
##########
@@ -0,0 +1,420 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+### Using Filter
+
+`PCollection` datasets can be filtered using the `Filter` transform. You can create a filter by supplying a predicate and, when applied, filtering out all the elements of `PCollection` that don’t satisfy the predicate.
+
+{{if (eq .Sdk "go")}}
+```
+import (
+  "github.com/apache/fbeam/sdks/go/pkg/beam"
+  "github.com/apache/beam/sdks/go/pkg/beam/transforms/filter"
+)
+

Review Comment:
   create input like in Java



##########
learning/tour-of-beam/learning-content/common-transforms/filter/description.md:
##########
@@ -0,0 +1,420 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+### Using Filter
+
+`PCollection` datasets can be filtered using the `Filter` transform. You can create a filter by supplying a predicate and, when applied, filtering out all the elements of `PCollection` that don’t satisfy the predicate.
+
+{{if (eq .Sdk "go")}}
+```
+import (
+  "github.com/apache/fbeam/sdks/go/pkg/beam"
+  "github.com/apache/beam/sdks/go/pkg/beam/transforms/filter"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+  return filter.Exclude(s, input, func(element int) bool {
+    return element % 2 == 1
+  })
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+PCollection<String> input = pipeline
+        .apply(Create.of(List.of("Hello","world","Hi")));
+
+PCollection<String> filteredStrings = input
+        .apply(Filter.by(new SerializableFunction<String, Boolean>() {
+            @Override
+            public Boolean apply(String input) {
+                return input.length() > 3;
+            }
+        }));
+```
+
+Output
+
+```
+Hello
+world
+```
+
+### Built-in filters
+
+The Java SDK has several filter methods built-in, like `Filter.greaterThan` and `Filter.lessThen`  With `Filter.greaterThan`, the input `PCollection` can be filtered so that only the elements whose values are greater than the specified amount remain. Similarly, you can use `Filter.lessThen` to filter out elements of the input `PCollection` whose values are greater than the specified amount.
+
+Other built-in filters are:
+
+* Filter.greaterThanEq
+* Filter.greaterThan
+* Filter.lessThan
+* Filter.lessThanEq
+* Filter.equal
+
+
+## Example 2: Filtering with a built-in methods
+
+```
+// List of integers
+PCollection<Integer> input = pipeline.apply(Create.of(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)));
+
+PCollection<Integer> greaterThanEqNumbers = input.apply(Filter.greaterThanEq(3));
+// PCollection will contain [3, 4, 5, 6, 7, 8, 9, 10] at this point
+
+PCollection<Integer> greaterThanNumbers = input.apply(Filter.greaterThan(4));
+// PCollection will contain [5, 6, 7, 8, 9, 10] at this point
+
+
+PCollection<Integer> lessThanNumbers = input.apply(Filter.lessThan(10));
+// PCollection will contain [1, 2, 3, 4, 5, 6, 7, 8, 9] at this point
+
+
+PCollection<Integer> lessThanEqNumbers = input.apply(Filter.lessThanEq(7));
+// PCollection will contain [1, 2, 3, 4 5, 6, 7] at this point
+
+
+PCollection<Integer> equalNumbers = input.apply(Filter.equal(9));
+// PCollection will contain [9] at this point
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+from log_elements import LogElements
+
+with beam.Pipeline() as p:
+  (p | beam.Create(range(1, 11))
+     | beam.Filter(lambda num: num % 2 == 0)
+     | LogElements())
+```
+
+
+### Example 1: Filtering with a function
+
+You can define a function `is_perennial()` which returns True if the element’s duration equals 'perennial', and False otherwise.
+
+```
+import apache_beam as beam
+
+def is_perennial(plant):
+  return plant['duration'] == 'perennial'
+
+with beam.Pipeline() as p:
+  perennials = (
+      p
+      | 'Gardening plants' >> beam.Create([
+          {
+              'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'
+          },
+          {
+              'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'
+          },
+          {
+              'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'
+          },
+      ])
+      | 'Filter perennials' >> beam.Filter(is_perennial)
+      | beam.Map(print))
+```
+
+Output
+
+```
+{'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'}
+{'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'}
+{'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'}
+```
+
+### Example 2: Filtering with a lambda function
+
+You can also use lambda functions to simplify Example 1.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  perennials = (
+      p
+      | 'Gardening plants' >> beam.Create([
+          {
+              'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'
+          },
+          {
+              'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'
+          },
+          {
+              'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'
+          },
+      ])
+      | 'Filter perennials' >>
+      beam.Filter(lambda plant: plant['duration'] == 'perennial')
+      | beam.Map(print))
+```
+
+Output
+
+```
+{'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'}
+{'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'}
+{'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'}
+```
+
+### Example 3: Filtering with multiple arguments
+
+You can pass functions with multiple arguments to `Filter`. They are passed as additional positional arguments or keyword arguments to the function.
+
+In this example, `has_duration` takes `plant` and `duration` as arguments.
+
+```
+import apache_beam as beam
+
+def has_duration(plant, duration):
+  return plant['duration'] == duration
+
+with beam.Pipeline() as p:
+  perennials = (
+      p
+      | 'Gardening plants' >> beam.Create([
+          {
+              'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'
+          },
+          {
+              'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'
+          },
+          {
+              'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'
+          },
+      ])
+      | 'Filter perennials' >> beam.Filter(has_duration, 'perennial')
+      | beam.Map(print))
+```
+
+Output
+
+```
+{'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'}
+{'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'}
+{'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'}
+```
+
+### Example 4: Filtering with side inputs as singletons
+
+If the `PCollection` has a single value, such as the average from another computation, passing the `PCollection` as a singleton accesses that value.
+
+In this example, we pass a `PCollection` the value **perennial** as a singleton. We then use that value to filter out perennials.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  perennial = p | 'Perennial' >> beam.Create(['perennial'])
+
+  perennials = (
+      pipeline
+      | 'Gardening plants' >> beam.Create([
+          {
+              'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'
+          },
+          {
+              'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'
+          },
+          {
+              'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'
+          },
+      ])
+      | 'Filter perennials' >> beam.Filter(
+          lambda plant,
+          duration: plant['duration'] == duration,
+          duration=beam.pvalue.AsSingleton(perennial),
+      )
+      | beam.Map(print))
+```
+
+Output
+
+```
+{'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'}
+{'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'}
+{'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'}
+```
+
+### Example 5: Filtering with side inputs as iterators
+
+If the `PCollection` has multiple values, pass the PCollection as an iterator. This accesses elements lazily as they are needed, so it is possible to iterate over large PCollections that won’t fit into memory.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  valid_durations = p | 'Valid durations' >> beam.Create([
+      'annual',
+      'biennial',
+      'perennial',
+  ])
+
+  valid_plants = (
+      pipeline

Review Comment:
   see above



##########
learning/tour-of-beam/learning-content/common-transforms/filter/description.md:
##########
@@ -0,0 +1,420 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+### Using Filter
+
+`PCollection` datasets can be filtered using the `Filter` transform. You can create a filter by supplying a predicate and, when applied, filtering out all the elements of `PCollection` that don’t satisfy the predicate.
+
+{{if (eq .Sdk "go")}}
+```
+import (
+  "github.com/apache/fbeam/sdks/go/pkg/beam"
+  "github.com/apache/beam/sdks/go/pkg/beam/transforms/filter"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+  return filter.Exclude(s, input, func(element int) bool {
+    return element % 2 == 1
+  })
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+PCollection<String> input = pipeline
+        .apply(Create.of(List.of("Hello","world","Hi")));
+
+PCollection<String> filteredStrings = input
+        .apply(Filter.by(new SerializableFunction<String, Boolean>() {
+            @Override
+            public Boolean apply(String input) {
+                return input.length() > 3;
+            }
+        }));
+```
+
+Output
+
+```
+Hello
+world
+```
+
+### Built-in filters
+
+The Java SDK has several filter methods built-in, like `Filter.greaterThan` and `Filter.lessThen`  With `Filter.greaterThan`, the input `PCollection` can be filtered so that only the elements whose values are greater than the specified amount remain. Similarly, you can use `Filter.lessThen` to filter out elements of the input `PCollection` whose values are greater than the specified amount.
+
+Other built-in filters are:
+
+* Filter.greaterThanEq
+* Filter.greaterThan
+* Filter.lessThan
+* Filter.lessThanEq
+* Filter.equal
+
+
+## Example 2: Filtering with a built-in methods
+
+```
+// List of integers
+PCollection<Integer> input = pipeline.apply(Create.of(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)));
+
+PCollection<Integer> greaterThanEqNumbers = input.apply(Filter.greaterThanEq(3));
+// PCollection will contain [3, 4, 5, 6, 7, 8, 9, 10] at this point
+
+PCollection<Integer> greaterThanNumbers = input.apply(Filter.greaterThan(4));
+// PCollection will contain [5, 6, 7, 8, 9, 10] at this point
+
+
+PCollection<Integer> lessThanNumbers = input.apply(Filter.lessThan(10));
+// PCollection will contain [1, 2, 3, 4, 5, 6, 7, 8, 9] at this point
+
+
+PCollection<Integer> lessThanEqNumbers = input.apply(Filter.lessThanEq(7));
+// PCollection will contain [1, 2, 3, 4 5, 6, 7] at this point
+
+
+PCollection<Integer> equalNumbers = input.apply(Filter.equal(9));
+// PCollection will contain [9] at this point
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+from log_elements import LogElements
+
+with beam.Pipeline() as p:
+  (p | beam.Create(range(1, 11))
+     | beam.Filter(lambda num: num % 2 == 0)
+     | LogElements())
+```
+
+
+### Example 1: Filtering with a function
+
+You can define a function `is_perennial()` which returns True if the element’s duration equals 'perennial', and False otherwise.
+
+```
+import apache_beam as beam
+
+def is_perennial(plant):
+  return plant['duration'] == 'perennial'
+
+with beam.Pipeline() as p:
+  perennials = (
+      p
+      | 'Gardening plants' >> beam.Create([
+          {
+              'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'
+          },
+          {
+              'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'
+          },
+          {
+              'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'
+          },
+      ])
+      | 'Filter perennials' >> beam.Filter(is_perennial)
+      | beam.Map(print))
+```
+
+Output
+
+```
+{'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'}
+{'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'}
+{'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'}
+```
+
+### Example 2: Filtering with a lambda function
+
+You can also use lambda functions to simplify Example 1.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  perennials = (
+      p
+      | 'Gardening plants' >> beam.Create([
+          {
+              'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'
+          },
+          {
+              'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'
+          },
+          {
+              'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'
+          },
+      ])
+      | 'Filter perennials' >>
+      beam.Filter(lambda plant: plant['duration'] == 'perennial')
+      | beam.Map(print))
+```
+
+Output
+
+```
+{'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'}
+{'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'}
+{'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'}
+```
+
+### Example 3: Filtering with multiple arguments
+
+You can pass functions with multiple arguments to `Filter`. They are passed as additional positional arguments or keyword arguments to the function.
+
+In this example, `has_duration` takes `plant` and `duration` as arguments.
+
+```
+import apache_beam as beam
+
+def has_duration(plant, duration):
+  return plant['duration'] == duration
+
+with beam.Pipeline() as p:
+  perennials = (
+      p
+      | 'Gardening plants' >> beam.Create([
+          {
+              'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'
+          },
+          {
+              'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'
+          },
+          {
+              'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'
+          },
+      ])
+      | 'Filter perennials' >> beam.Filter(has_duration, 'perennial')
+      | beam.Map(print))
+```
+
+Output
+
+```
+{'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'}
+{'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'}
+{'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'}
+```
+
+### Example 4: Filtering with side inputs as singletons
+
+If the `PCollection` has a single value, such as the average from another computation, passing the `PCollection` as a singleton accesses that value.
+
+In this example, we pass a `PCollection` the value **perennial** as a singleton. We then use that value to filter out perennials.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  perennial = p | 'Perennial' >> beam.Create(['perennial'])
+
+  perennials = (
+      pipeline
+      | 'Gardening plants' >> beam.Create([
+          {
+              'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'
+          },
+          {
+              'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'
+          },
+          {
+              'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'
+          },
+      ])
+      | 'Filter perennials' >> beam.Filter(
+          lambda plant,
+          duration: plant['duration'] == duration,
+          duration=beam.pvalue.AsSingleton(perennial),
+      )
+      | beam.Map(print))
+```
+
+Output
+
+```
+{'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'}
+{'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'}
+{'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'}
+```
+
+### Example 5: Filtering with side inputs as iterators
+
+If the `PCollection` has multiple values, pass the PCollection as an iterator. This accesses elements lazily as they are needed, so it is possible to iterate over large PCollections that won’t fit into memory.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  valid_durations = p | 'Valid durations' >> beam.Create([
+      'annual',
+      'biennial',
+      'perennial',
+  ])
+
+  valid_plants = (
+      pipeline
+      | 'Gardening plants' >> beam.Create([
+          {
+              'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'
+          },
+          {
+              'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'
+          },
+          {
+              'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'PERENNIAL'
+          },
+      ])
+      | 'Filter valid plants' >> beam.Filter(
+          lambda plant,
+          valid_durations: plant['duration'] in valid_durations,
+          valid_durations=beam.pvalue.AsIter(valid_durations),
+      )
+      | beam.Map(print))
+```
+
+Output
+
+```
+{'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'}
+{'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'}
+{'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'}
+{'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'}
+```
+
+### Example 6: Filtering with side inputs as dictionaries
+
+If a `PCollection` is small enough to fit into memory, then that `PCollection` can be passed as a dictionary. Each element must be a `(key, value)` pair. Note that all the elements of the `PCollection` must fit into memory for this. If the `PCollection` won’t fit into memory, use `beam.pvalue.AsIter(pcollection)` instead.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  keep_duration = p | 'Duration filters' >> beam.Create([
+      ('annual', False),
+      ('biennial', False),
+      ('perennial', True),
+  ])
+
+  perennials = (

Review Comment:
   see above



##########
learning/tour-of-beam/learning-content/common-transforms/aggregation/sum/python-example/task.py:
##########
@@ -0,0 +1,53 @@
+#   Licensed to the Apache Software Foundation (ASF) under one
+#   or more contributor license agreements.  See the NOTICE file
+#   distributed with this work for additional information
+#   regarding copyright ownership.  The ASF licenses this file
+#   to you under the Apache License, Version 2.0 (the
+#   "License"); you may not use this file except in compliance
+#   with the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# beam-playground:
+#   name: Sum
+#   description: Sum example.
+#   multifile: false
+#   context_line: 42
+#   categories:
+#     - Quickstart
+#   complexity: BASIC
+#   tags:
+#     - hellobeam
+
+
+import apache_beam as beam
+
+# Output PCollection
+class Output(beam.PTransform):
+    class _OutputFn(beam.DoFn):
+        def __init__(self, prefix=''):
+            super().__init__()
+            self.prefix = prefix
+
+        def process(self, element):
+            print(self.prefix+str(element))
+
+    def __init__(self, label=None,prefix=''):
+        super().__init__(label)
+        self.prefix = prefix
+
+    def expand(self, input):
+        input | beam.ParDo(self._OutputFn(self.prefix))
+
+with beam.Pipeline() as p:
+

Review Comment:
   remove blank line



##########
learning/tour-of-beam/learning-content/common-transforms/aggregation/count/description.md:
##########
@@ -0,0 +1,311 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Count
+
+`Count` provides many transformations for calculating the count of values in a `PCollection`, either globally or for each key.
+
+{{if (eq .Sdk "go")}}
+Counts the number of elements within each aggregation. The Count transform has two varieties:
+
+You can count the number of elements in ```PCollection``` with ```CountElms()```, it will return one element.
+
+```
+import (
+    "github.com/apache/beam/sdks/go/pkg/beam"
+    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+    return stats.CountElms(s, input)
+}
+```
+
+You can use ```Count()``` to count how many elements are associated with a particular key, the result will be one output for each key.
+
+```
+import (
+    "github.com/apache/beam/sdks/go/pkg/beam"
+    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+    return stats.Count(s, input)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+Counts the number of elements within each aggregation. The Count transform has three varieties:
+
+### Counting all elements in a PCollection
+
+```Count.globally()``` counts the number of elements in the entire PCollection. The result is a collection with a single element.
+
+```
+PCollection<Integer> numbers = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+PCollection<Long> output = numbers.apply(Count.globally());
+```
+
+Output
+```
+10
+```
+
+### Counting elements for each key
+
+```Count.perKey()``` counts how many elements are associated with each key. It ignores the values. The resulting collection has one output for every key in the input collection.
+
+```
+PCollection<KV<String, Integer>> input = pipeline.apply(
+    Create.of(KV.of("πŸ₯•", 3),
+              KV.of("πŸ₯•", 2),
+              KV.of("πŸ†", 1),
+              KV.of("πŸ…", 4),
+              KV.of("πŸ…", 5),
+              KV.of("πŸ…", 3)));
+PCollection<KV<String, Long>> output = input.apply(Count.perKey());
+```
+
+Output
+
+```
+KV{πŸ₯•, 2}
+KV{πŸ…, 3}
+KV{πŸ†, 1}
+```
+
+### Counting all unique elements
+
+```Count.perElement()``` counts how many times each element appears in the input collection. The output collection is a key-value pair, containing each unique element and the number of times it appeared in the original collection.
+
+```
+PCollection<KV<String, Integer>> input = pipeline.apply(
+    Create.of(KV.of("πŸ₯•", 3),
+              KV.of("πŸ₯•", 2),
+              KV.of("πŸ†", 1),
+              KV.of("πŸ…", 3),
+              KV.of("πŸ…", 5),
+              KV.of("πŸ…", 3)));
+PCollection<KV<String, Long>> output = input.apply(Count.perElement());
+```
+
+Output
+
+```
+KV{KV{πŸ…, 3}, 2}
+KV{KV{πŸ₯•, 2}, 1}
+KV{KV{πŸ†, 1}, 1}
+KV{KV{πŸ₯•, 3}, 1}
+KV{KV{πŸ…, 5}, 1}
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+### Counting all elements in a PCollection
+
+You can use ```Count.Globally()``` to count all elements in a PCollection, even if there are duplicate elements.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as pipeline:
+  total_elements = (
+      pipeline
+      | 'Create plants' >> beam.Create(
+          ['πŸ“', 'πŸ₯•', 'πŸ₯•', 'πŸ₯•', 'πŸ†', 'πŸ†', 'πŸ…', 'πŸ…', 'πŸ…', '🌽'])
+      | 'Count all elements' >> beam.combiners.Count.Globally()
+      | beam.Map(print))
+```
+
+Output
+
+```
+10
+```
+
+### Counting elements for each key
+
+You can use ```Count.PerKey()``` to count the elements for each unique key in a PCollection of key-values.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as pipeline:
+  total_elements_per_keys = (
+      pipeline
+      | 'Create plants' >> beam.Create([
+          ('spring', 'πŸ“'),
+          ('spring', 'πŸ₯•'),
+          ('summer', 'πŸ₯•'),
+          ('fall', 'πŸ₯•'),
+          ('spring', 'πŸ†'),
+          ('winter', 'πŸ†'),
+          ('spring', 'πŸ…'),
+          ('summer', 'πŸ…'),
+          ('fall', 'πŸ…'),
+          ('summer', '🌽'),
+      ])
+      | 'Count elements per key' >> beam.combiners.Count.PerKey()
+      | beam.Map(print))
+```
+
+Output
+
+```
+('spring', 4)
+('summer', 3)
+('fall', 2)
+('winter', 1)
+```
+
+### Counting all unique elements
+
+You can use ```Count.PerElement()``` to count only the unique elements in a PCollection.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as pipeline:
+  total_unique_elements = (
+      pipeline
+      | 'Create produce' >> beam.Create(
+          ['πŸ“', 'πŸ₯•', 'πŸ₯•', 'πŸ₯•', 'πŸ†', 'πŸ†', 'πŸ…', 'πŸ…', 'πŸ…', '🌽'])
+      | 'Count unique elements' >> beam.combiners.Count.PerElement()
+      | beam.Map(print))
+```
+
+Output
+
+```
+('πŸ“', 1)
+('πŸ₯•', 3)
+('πŸ†', 2)
+('πŸ…', 3)
+('🌽', 1)
+```
+{{end}}
+### Playground exercise
+
+You can find the full code of this example in the playground window, which you can run and experiment with.
+{{if (eq .Sdk "python")}}
+`Count.globally` returns the number of integers from the `PCollection`. If you replace the `integers input` with this `map input` and replace `beam.combiners.Count.Globally` on `beam.combiners.Count.PerKey` it will output the count numbers by key :
+
+```
+beam.Create([
+    (1, 36),
+    (2, 91),
+    (3, 33),
+    (3, 11),
+    (4, 67),
+]) | beam.combiners.Count.PerKey()
+```
+
+And Count transforms work with strings too! Can you change the example to count the number of words in a given sentence and how often each word occurs?
+
+Count how many words are repeated with `Count`:
+
+```
+  (p | beam.Create(["To be, or not to be: that is the question:Whether 'tis nobler in the mind to suffer The slings and arrows of outrageous fortune,Or to take arms against a sea of troubles,And by opposing end them. To die: to sleep"])

Review Comment:
   please fix, add spaces after all ,: etc



##########
learning/tour-of-beam/learning-content/common-transforms/filter/description.md:
##########
@@ -0,0 +1,420 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+### Using Filter
+
+`PCollection` datasets can be filtered using the `Filter` transform. You can create a filter by supplying a predicate and, when applied, filtering out all the elements of `PCollection` that don’t satisfy the predicate.
+
+{{if (eq .Sdk "go")}}
+```
+import (
+  "github.com/apache/fbeam/sdks/go/pkg/beam"
+  "github.com/apache/beam/sdks/go/pkg/beam/transforms/filter"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+  return filter.Exclude(s, input, func(element int) bool {
+    return element % 2 == 1
+  })
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+PCollection<String> input = pipeline
+        .apply(Create.of(List.of("Hello","world","Hi")));
+
+PCollection<String> filteredStrings = input
+        .apply(Filter.by(new SerializableFunction<String, Boolean>() {
+            @Override
+            public Boolean apply(String input) {
+                return input.length() > 3;
+            }
+        }));
+```
+
+Output
+
+```
+Hello
+world
+```
+
+### Built-in filters
+
+The Java SDK has several filter methods built-in, like `Filter.greaterThan` and `Filter.lessThen`  With `Filter.greaterThan`, the input `PCollection` can be filtered so that only the elements whose values are greater than the specified amount remain. Similarly, you can use `Filter.lessThen` to filter out elements of the input `PCollection` whose values are greater than the specified amount.
+
+Other built-in filters are:
+
+* Filter.greaterThanEq
+* Filter.greaterThan
+* Filter.lessThan
+* Filter.lessThanEq
+* Filter.equal
+
+
+## Example 2: Filtering with a built-in methods
+
+```
+// List of integers
+PCollection<Integer> input = pipeline.apply(Create.of(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)));
+
+PCollection<Integer> greaterThanEqNumbers = input.apply(Filter.greaterThanEq(3));
+// PCollection will contain [3, 4, 5, 6, 7, 8, 9, 10] at this point
+
+PCollection<Integer> greaterThanNumbers = input.apply(Filter.greaterThan(4));
+// PCollection will contain [5, 6, 7, 8, 9, 10] at this point
+
+
+PCollection<Integer> lessThanNumbers = input.apply(Filter.lessThan(10));
+// PCollection will contain [1, 2, 3, 4, 5, 6, 7, 8, 9] at this point
+
+
+PCollection<Integer> lessThanEqNumbers = input.apply(Filter.lessThanEq(7));
+// PCollection will contain [1, 2, 3, 4 5, 6, 7] at this point
+
+
+PCollection<Integer> equalNumbers = input.apply(Filter.equal(9));
+// PCollection will contain [9] at this point
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+from log_elements import LogElements
+
+with beam.Pipeline() as p:
+  (p | beam.Create(range(1, 11))
+     | beam.Filter(lambda num: num % 2 == 0)
+     | LogElements())
+```
+
+
+### Example 1: Filtering with a function
+
+You can define a function `is_perennial()` which returns True if the element’s duration equals 'perennial', and False otherwise.
+
+```
+import apache_beam as beam
+
+def is_perennial(plant):
+  return plant['duration'] == 'perennial'
+
+with beam.Pipeline() as p:
+  perennials = (
+      p
+      | 'Gardening plants' >> beam.Create([
+          {
+              'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'
+          },
+          {
+              'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'
+          },
+          {
+              'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'
+          },
+      ])
+      | 'Filter perennials' >> beam.Filter(is_perennial)
+      | beam.Map(print))
+```
+
+Output
+
+```
+{'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'}
+{'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'}
+{'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'}
+```
+
+### Example 2: Filtering with a lambda function
+
+You can also use lambda functions to simplify Example 1.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  perennials = (
+      p
+      | 'Gardening plants' >> beam.Create([
+          {
+              'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'
+          },
+          {
+              'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'
+          },
+          {
+              'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'
+          },
+      ])
+      | 'Filter perennials' >>
+      beam.Filter(lambda plant: plant['duration'] == 'perennial')
+      | beam.Map(print))
+```
+
+Output
+
+```
+{'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'}
+{'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'}
+{'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'}
+```
+
+### Example 3: Filtering with multiple arguments
+
+You can pass functions with multiple arguments to `Filter`. They are passed as additional positional arguments or keyword arguments to the function.
+
+In this example, `has_duration` takes `plant` and `duration` as arguments.
+
+```
+import apache_beam as beam
+
+def has_duration(plant, duration):
+  return plant['duration'] == duration
+
+with beam.Pipeline() as p:
+  perennials = (
+      p

Review Comment:
   (p | [first transform]
   on one line



##########
learning/tour-of-beam/learning-content/common-transforms/aggregation/count/description.md:
##########
@@ -0,0 +1,311 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Count
+
+`Count` provides many transformations for calculating the count of values in a `PCollection`, either globally or for each key.
+
+{{if (eq .Sdk "go")}}
+Counts the number of elements within each aggregation. The Count transform has two varieties:
+
+You can count the number of elements in ```PCollection``` with ```CountElms()```, it will return one element.
+
+```
+import (
+    "github.com/apache/beam/sdks/go/pkg/beam"
+    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+    return stats.CountElms(s, input)
+}
+```
+
+You can use ```Count()``` to count how many elements are associated with a particular key, the result will be one output for each key.
+
+```
+import (
+    "github.com/apache/beam/sdks/go/pkg/beam"
+    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+    return stats.Count(s, input)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+Counts the number of elements within each aggregation. The Count transform has three varieties:
+
+### Counting all elements in a PCollection
+
+```Count.globally()``` counts the number of elements in the entire PCollection. The result is a collection with a single element.
+
+```
+PCollection<Integer> numbers = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+PCollection<Long> output = numbers.apply(Count.globally());
+```
+
+Output
+```
+10
+```
+
+### Counting elements for each key
+
+```Count.perKey()``` counts how many elements are associated with each key. It ignores the values. The resulting collection has one output for every key in the input collection.
+
+```
+PCollection<KV<String, Integer>> input = pipeline.apply(
+    Create.of(KV.of("πŸ₯•", 3),
+              KV.of("πŸ₯•", 2),
+              KV.of("πŸ†", 1),
+              KV.of("πŸ…", 4),
+              KV.of("πŸ…", 5),
+              KV.of("πŸ…", 3)));
+PCollection<KV<String, Long>> output = input.apply(Count.perKey());
+```
+
+Output
+
+```
+KV{πŸ₯•, 2}
+KV{πŸ…, 3}
+KV{πŸ†, 1}
+```
+
+### Counting all unique elements
+
+```Count.perElement()``` counts how many times each element appears in the input collection. The output collection is a key-value pair, containing each unique element and the number of times it appeared in the original collection.
+
+```
+PCollection<KV<String, Integer>> input = pipeline.apply(
+    Create.of(KV.of("πŸ₯•", 3),
+              KV.of("πŸ₯•", 2),
+              KV.of("πŸ†", 1),
+              KV.of("πŸ…", 3),
+              KV.of("πŸ…", 5),
+              KV.of("πŸ…", 3)));
+PCollection<KV<String, Long>> output = input.apply(Count.perElement());
+```
+
+Output
+
+```
+KV{KV{πŸ…, 3}, 2}
+KV{KV{πŸ₯•, 2}, 1}
+KV{KV{πŸ†, 1}, 1}
+KV{KV{πŸ₯•, 3}, 1}
+KV{KV{πŸ…, 5}, 1}
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+### Counting all elements in a PCollection
+
+You can use ```Count.Globally()``` to count all elements in a PCollection, even if there are duplicate elements.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as pipeline:
+  total_elements = (
+      pipeline
+      | 'Create plants' >> beam.Create(
+          ['πŸ“', 'πŸ₯•', 'πŸ₯•', 'πŸ₯•', 'πŸ†', 'πŸ†', 'πŸ…', 'πŸ…', 'πŸ…', '🌽'])
+      | 'Count all elements' >> beam.combiners.Count.Globally()
+      | beam.Map(print))
+```
+
+Output
+
+```
+10
+```
+
+### Counting elements for each key
+
+You can use ```Count.PerKey()``` to count the elements for each unique key in a PCollection of key-values.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as pipeline:
+  total_elements_per_keys = (
+      pipeline
+      | 'Create plants' >> beam.Create([
+          ('spring', 'πŸ“'),
+          ('spring', 'πŸ₯•'),
+          ('summer', 'πŸ₯•'),
+          ('fall', 'πŸ₯•'),
+          ('spring', 'πŸ†'),
+          ('winter', 'πŸ†'),
+          ('spring', 'πŸ…'),
+          ('summer', 'πŸ…'),
+          ('fall', 'πŸ…'),
+          ('summer', '🌽'),
+      ])
+      | 'Count elements per key' >> beam.combiners.Count.PerKey()
+      | beam.Map(print))
+```
+
+Output
+
+```
+('spring', 4)
+('summer', 3)
+('fall', 2)
+('winter', 1)
+```
+
+### Counting all unique elements
+
+You can use ```Count.PerElement()``` to count only the unique elements in a PCollection.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as pipeline:
+  total_unique_elements = (
+      pipeline
+      | 'Create produce' >> beam.Create(
+          ['πŸ“', 'πŸ₯•', 'πŸ₯•', 'πŸ₯•', 'πŸ†', 'πŸ†', 'πŸ…', 'πŸ…', 'πŸ…', '🌽'])
+      | 'Count unique elements' >> beam.combiners.Count.PerElement()
+      | beam.Map(print))
+```
+
+Output
+
+```
+('πŸ“', 1)
+('πŸ₯•', 3)
+('πŸ†', 2)
+('πŸ…', 3)
+('🌽', 1)
+```
+{{end}}
+### Playground exercise
+
+You can find the full code of this example in the playground window, which you can run and experiment with.
+{{if (eq .Sdk "python")}}
+`Count.globally` returns the number of integers from the `PCollection`. If you replace the `integers input` with this `map input` and replace `beam.combiners.Count.Globally` on `beam.combiners.Count.PerKey` it will output the count numbers by key :
+
+```
+beam.Create([

Review Comment:
   please fix



##########
learning/tour-of-beam/learning-content/common-transforms/aggregation/sum/description.md:
##########
@@ -0,0 +1,188 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Sum
+
+You can use Sum transforms to compute the sum of the elements in a collection or the sum of the values associated with each key in a collection of key-value pairs.
+{{if (eq .Sdk "go")}}
+```
+import (
+  "github.com/apache/beam/sdks/go/pkg/beam"
+  "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+  return stats.Sum(s, input)
+}
+```
+
+You can use ```SumPerKey()```to calculate the sum Integer associated with each unique key.
+
+```
+import (
+  "github.com/apache/beam/sdks/go/pkg/beam"
+  "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+  return stats.SumPerKey(s, input)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+You can find the global sum value from the ```PCollection``` by using ```Sum.doublesGlobally()```
+
+```
+PCollection<Integer> numbers = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+PCollection<Double> sum = numbers.apply(Sum.doublesGlobally());
+```
+
+Output
+
+```
+55
+```
+
+You can use ```Sum.integersPerKey()```to calculate the sum Integer associated with each unique key (which is of type String).
+
+```
+PCollection<KV<String, Integer>> input = pipeline.apply(
+    Create.of(KV.of("πŸ₯•", 3),
+              KV.of("πŸ₯•", 2),
+              KV.of("πŸ†", 1),
+              KV.of("πŸ…", 4),
+              KV.of("πŸ…", 5),
+              KV.of("πŸ…", 3)));
+PCollection<KV<String, Integer>> sumPerKey = input.apply(Sum.integersPerKey());
+```
+
+Output
+
+```
+KV{πŸ†, 1}
+KV{πŸ…, 12}
+KV{πŸ₯•, 5}
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+### Sum of the elements in a PCollection
+
+You can find the global sum value from the ```PCollection``` by using ```CombineGlobally(sum)```
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as pipeline:
+  total = (

Review Comment:
   (p = [first transform] on one line



##########
learning/tour-of-beam/learning-content/common-transforms/filter/description.md:
##########
@@ -0,0 +1,420 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+### Using Filter
+
+`PCollection` datasets can be filtered using the `Filter` transform. You can create a filter by supplying a predicate and, when applied, filtering out all the elements of `PCollection` that don’t satisfy the predicate.
+
+{{if (eq .Sdk "go")}}
+```
+import (
+  "github.com/apache/fbeam/sdks/go/pkg/beam"
+  "github.com/apache/beam/sdks/go/pkg/beam/transforms/filter"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+  return filter.Exclude(s, input, func(element int) bool {
+    return element % 2 == 1
+  })
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+PCollection<String> input = pipeline
+        .apply(Create.of(List.of("Hello","world","Hi")));
+
+PCollection<String> filteredStrings = input
+        .apply(Filter.by(new SerializableFunction<String, Boolean>() {
+            @Override
+            public Boolean apply(String input) {
+                return input.length() > 3;
+            }
+        }));
+```
+
+Output
+
+```
+Hello
+world
+```
+
+### Built-in filters
+
+The Java SDK has several filter methods built-in, like `Filter.greaterThan` and `Filter.lessThen`  With `Filter.greaterThan`, the input `PCollection` can be filtered so that only the elements whose values are greater than the specified amount remain. Similarly, you can use `Filter.lessThen` to filter out elements of the input `PCollection` whose values are greater than the specified amount.
+
+Other built-in filters are:
+
+* Filter.greaterThanEq
+* Filter.greaterThan
+* Filter.lessThan
+* Filter.lessThanEq
+* Filter.equal
+
+
+## Example 2: Filtering with a built-in methods
+
+```
+// List of integers
+PCollection<Integer> input = pipeline.apply(Create.of(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)));
+
+PCollection<Integer> greaterThanEqNumbers = input.apply(Filter.greaterThanEq(3));
+// PCollection will contain [3, 4, 5, 6, 7, 8, 9, 10] at this point
+
+PCollection<Integer> greaterThanNumbers = input.apply(Filter.greaterThan(4));
+// PCollection will contain [5, 6, 7, 8, 9, 10] at this point
+
+
+PCollection<Integer> lessThanNumbers = input.apply(Filter.lessThan(10));
+// PCollection will contain [1, 2, 3, 4, 5, 6, 7, 8, 9] at this point
+
+
+PCollection<Integer> lessThanEqNumbers = input.apply(Filter.lessThanEq(7));
+// PCollection will contain [1, 2, 3, 4 5, 6, 7] at this point
+
+
+PCollection<Integer> equalNumbers = input.apply(Filter.equal(9));
+// PCollection will contain [9] at this point
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+from log_elements import LogElements
+
+with beam.Pipeline() as p:
+  (p | beam.Create(range(1, 11))
+     | beam.Filter(lambda num: num % 2 == 0)
+     | LogElements())
+```
+
+
+### Example 1: Filtering with a function
+
+You can define a function `is_perennial()` which returns True if the element’s duration equals 'perennial', and False otherwise.
+
+```
+import apache_beam as beam
+
+def is_perennial(plant):
+  return plant['duration'] == 'perennial'
+
+with beam.Pipeline() as p:
+  perennials = (
+      p
+      | 'Gardening plants' >> beam.Create([
+          {
+              'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'
+          },
+          {
+              'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'
+          },
+          {
+              'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'
+          },
+      ])
+      | 'Filter perennials' >> beam.Filter(is_perennial)
+      | beam.Map(print))
+```
+
+Output
+
+```
+{'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'}
+{'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'}
+{'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'}
+```
+
+### Example 2: Filtering with a lambda function
+
+You can also use lambda functions to simplify Example 1.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  perennials = (
+      p
+      | 'Gardening plants' >> beam.Create([
+          {
+              'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'
+          },
+          {
+              'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'
+          },
+          {
+              'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'
+          },
+      ])
+      | 'Filter perennials' >>
+      beam.Filter(lambda plant: plant['duration'] == 'perennial')
+      | beam.Map(print))
+```
+
+Output
+
+```
+{'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'}
+{'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'}
+{'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'}
+```
+
+### Example 3: Filtering with multiple arguments
+
+You can pass functions with multiple arguments to `Filter`. They are passed as additional positional arguments or keyword arguments to the function.
+
+In this example, `has_duration` takes `plant` and `duration` as arguments.
+
+```
+import apache_beam as beam
+
+def has_duration(plant, duration):
+  return plant['duration'] == duration
+
+with beam.Pipeline() as p:
+  perennials = (
+      p
+      | 'Gardening plants' >> beam.Create([
+          {
+              'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'
+          },
+          {
+              'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'
+          },
+          {
+              'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'
+          },
+          {
+              'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'
+          },
+      ])
+      | 'Filter perennials' >> beam.Filter(has_duration, 'perennial')
+      | beam.Map(print))
+```
+
+Output
+
+```
+{'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'}
+{'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'}
+{'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'}
+```
+
+### Example 4: Filtering with side inputs as singletons
+
+If the `PCollection` has a single value, such as the average from another computation, passing the `PCollection` as a singleton accesses that value.
+
+In this example, we pass a `PCollection` the value **perennial** as a singleton. We then use that value to filter out perennials.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  perennial = p | 'Perennial' >> beam.Create(['perennial'])
+
+  perennials = (
+      pipeline

Review Comment:
   can use p1 above and p2 here if needed



##########
learning/tour-of-beam/learning-content/common-transforms/aggregation/count/description.md:
##########
@@ -0,0 +1,311 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Count
+
+`Count` provides many transformations for calculating the count of values in a `PCollection`, either globally or for each key.
+
+{{if (eq .Sdk "go")}}
+Counts the number of elements within each aggregation. The Count transform has two varieties:
+
+You can count the number of elements in a `PCollection` with `CountElms()`, it will return one element.
+
+```
+import (
+    "github.com/apache/beam/sdks/go/pkg/beam"
+    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+    return stats.CountElms(s, input)
+}
+```
+
+You can use `Count()` to count how many elements are associated with a particular key. The result will be one output for each key.
+
+```
+import (
+    "github.com/apache/beam/sdks/go/pkg/beam"
+    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+    return stats.Count(s, input)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+Counts the number of elements within each aggregation. The Count transform has three varieties:
+
+### Counting all elements in a PCollection
+
+`Count.globally()` counts the number of elements in the entire `PCollection`. The result is a collection with a single element.
+
+```
+PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+PCollection<Long> output = input.apply(Count.globally());
+```
+
+Output
+```
+10
+```
+
+### Counting elements for each key
+
+`Count.perKey()` counts how many elements are associated with each key. It ignores the values. The resulting collection has one output for every key in the input collection.
+
+```
+PCollection<KV<String, Integer>> input = pipeline.apply(
+    Create.of(KV.of("πŸ₯•", 3),
+              KV.of("πŸ₯•", 2),
+              KV.of("πŸ†", 1),
+              KV.of("πŸ…", 4),
+              KV.of("πŸ…", 5),
+              KV.of("πŸ…", 3)));
+PCollection<KV<String, Long>> output = input.apply(Count.perKey());
+```
+
+Output
+
+```
+KV{πŸ₯•, 2}
+KV{πŸ…, 3}
+KV{πŸ†, 1}
+```
+
+### Counting all unique elements
+
+`Count.perElement()` counts how many times each element appears in the input collection. The output collection is a key-value pair, containing each unique element and the number of times it appeared in the original collection.
+
+```
+PCollection<KV<String, Integer>> input = pipeline.apply(
+    Create.of(KV.of("πŸ₯•", 3),
+              KV.of("πŸ₯•", 2),
+              KV.of("πŸ†", 1),
+              KV.of("πŸ…", 3),
+              KV.of("πŸ…", 5),
+              KV.of("πŸ…", 3)));
+PCollection<KV<String, Long>> output = input.apply(Count.perElement());
+```
+
+Output
+
+```
+KV{KV{πŸ…, 3}, 2}
+KV{KV{πŸ₯•, 2}, 1}
+KV{KV{πŸ†, 1}, 1}
+KV{KV{πŸ₯•, 3}, 1}
+KV{KV{πŸ…, 5}, 1}
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+### Counting all elements in a PCollection
+
+You can use `Count.Globally()` to count all elements in a PCollection, even if there are duplicate elements.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  total_elements = (
+      p

Review Comment:
   For all code:
   Please use consistent formatting. We use 
   p | [first transform]
   on one line in all examples. Please update.



##########
learning/tour-of-beam/learning-content/common-transforms/aggregation/max/description.md:
##########
@@ -0,0 +1,192 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Max
+
+`Max` provides a variety of different transforms for computing the maximum values in a collection, either globally or for each key.
+
+{{if (eq .Sdk "go")}}
+You can find the global maximum value from the `PCollection` by using `Max()`
+
+```
+import (
+  "github.com/apache/beam/sdks/go/pkg/beam"
+  "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+  return stats.Max(s, input)
+}
+```
+
+You can use `MaxPerKey()` to calculate the maximum Integer associated with each unique key (which is of type String).
+
+```
+import (
+  "github.com/apache/beam/sdks/go/pkg/beam"
+  "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+  return stats.MaxPerKey(s, input)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+You can find the global maximum value from the `PCollection` by using `Max.doublesGlobally()`
+
+```
+PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+PCollection<Double> max = input.apply(Max.doublesGlobally());
+```
+
+Output
+
+```
+10
+```
+
+You can use `Max.integersPerKey()` to calculate the maximum Integer associated with each unique key (which is of type String).
+
+```
+PCollection<KV<String, Integer>> input = pipeline.apply(
+    Create.of(KV.of("πŸ₯•", 3),
+              KV.of("πŸ₯•", 2),
+              KV.of("πŸ†", 1),
+              KV.of("πŸ…", 4),
+              KV.of("πŸ…", 5),
+              KV.of("πŸ…", 3)));
+PCollection<KV<String, Integer>> maxPerKey = input.apply(Max.integersPerKey());
+```
+
+Output
+
+```
+KV{πŸ…, 5}
+KV{πŸ₯•, 3}
+KV{πŸ†, 1}
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+
+### Maximum element in a PCollection
+
+You can use `CombineGlobally(lambda elements: max(elements or [None]))` to get the maximum element from the entire `PCollection`.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  max_element = (
+      p
+      | 'Create numbers' >> beam.Create([3, 4, 1, 2])
+      | 'Get max value' >>
+      beam.CombineGlobally(lambda elements: max(elements or [None]))
+      | beam.Map(print))
+```
+
+Output
+
+```
+4
+```
+
+### Maximum elements for each key
+
+You can use `Combine.PerKey()` to get the maximum element for each unique key in a PCollection of key-values.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  elements_with_max_value_per_key = (
+      p
+      | 'Create produce' >> beam.Create([
+          ('πŸ₯•', 3),
+          ('πŸ₯•', 2),
+          ('πŸ†', 1),
+          ('πŸ…', 4),
+          ('πŸ…', 5),
+          ('πŸ…', 3),
+      ])
+      | 'Get max value per key' >> beam.CombinePerKey(max)
+      | beam.Map(print))
+```
+
+Output
+
+```
+('πŸ₯•', 3)
+('πŸ†', 1)
+('πŸ…', 5)
+```
+{{end}}
+
+### Playground exercise
+
+You can find the full code of this example in the playground window, which you can run and experiment with.
+{{if (eq .Sdk "go")}}
+`Max` returns the maximum number from the `PCollection`. If you replace the `integers input` with this `map input`:
+
+```
+input:= beam.ParDo(s, func(_ []byte, emit func(int, int)){
+  emit(1,1)
+  emit(1,4)
+  emit(2,6)
+  emit(2,3)
+  emit(2,-4)
+  emit(3,23)
+}, beam.Impulse(s))
+```
+
+And replace `stats.Max` with `stats.MaxPerKey` it will output the maximum numbers by key.
+{{end}}
+{{if (eq .Sdk "java")}}
+`Max.doublesGlobally` returns the maximum element from the `PCollection`. If you replace the `integers input` with this `map input`:
+
+```
+PCollection<KV<Integer, Integer>> input = pipeline.apply(
+    Create.of(KV.of(1, 11),
+    KV.of(1, 36),
+    KV.of(2, 91),
+    KV.of(3, 33),
+    KV.of(3, 11),
+    KV.of(4, 33)));
+```
+
+And replace `Max.doublesGlobally` with `Max.integersPerKey` it will output the maximum numbers by key. It is also necessary to replace the generic type:
+
+```
+PCollection<KV<Integer, Integer>> output = applyTransform(input);
+```
+
+```
+static PCollection<KV<Integer, Integer>> applyTransform(PCollection<KV<Integer, Integer>> input) {
+        return input.apply(Max.integersPerKey());
+    }
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+`Top.Largest` returns larger numbers from `PCCollection` than specified in the function argument. If you replace the `integers input` with this `map input` and replace `beam.combiners.Top.Largest(5)` with `beam.CombinePerKey(max)` it will output the maximum numbers by key :

Review Comment:
   This is not correct. Please change to:
   `Top.Largest(n)` returns the top n largest numbers from the `PCCollection`.
   
   https://beam.apache.org/documentation/transforms/python/aggregation/top/



##########
learning/tour-of-beam/learning-content/common-transforms/aggregation/max/python-example/task.py:
##########
@@ -0,0 +1,53 @@
+#   Licensed to the Apache Software Foundation (ASF) under one
+#   or more contributor license agreements.  See the NOTICE file
+#   distributed with this work for additional information
+#   regarding copyright ownership.  The ASF licenses this file
+#   to you under the Apache License, Version 2.0 (the
+#   "License"); you may not use this file except in compliance
+#   with the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# beam-playground:
+#   name: Max
+#   description: Max example.
+#   multifile: false
+#   context_line: 48
+#   categories:
+#     - Quickstart
+#   complexity: BASIC
+#   tags:
+#     - hellobeam
+
+
+import apache_beam as beam
+
+# Output PCollection
+class Output(beam.PTransform):
+    class _OutputFn(beam.DoFn):
+        def __init__(self, prefix=''):
+            super().__init__()
+            self.prefix = prefix
+
+        def process(self, element):
+            print(self.prefix+str(element))
+
+    def __init__(self, label=None,prefix=''):
+        super().__init__(label)
+        self.prefix = prefix
+
+    def expand(self, input):
+        input | beam.ParDo(self._OutputFn(self.prefix))
+
+with beam.Pipeline() as p:
+
+  (p | beam.Create(range(1, 11))
+  # beam.combiners.Top.Largest(5) to return the larger than [5] from `PCollection`.

Review Comment:
   Not correct. Replace with:
   beam.combiners.Top.Largest(5) to return the 5 largest values from the `PCollection`.



##########
learning/tour-of-beam/learning-content/common-transforms/aggregation/min/description.md:
##########
@@ -0,0 +1,190 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Min
+
+Min transforms find the minimum values globally or for each key in the input collection.
+{{if (eq .Sdk "go")}}
+You can find the global minimum value from the `PCollection` by using `Min()`
+
+```
+import (
+  "github.com/apache/beam/sdks/go/pkg/beam"
+  "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+  return stats.Min(s, input)
+}
+```
+
+You can use `MinPerKey()` to calculate the minimum Integer associated with each unique key (which is of type String).
+
+```
+import (
+  "github.com/apache/beam/sdks/go/pkg/beam"
+  "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
+  return stats.MinPerKey(s, input)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+You can find the global minimum value from the `PCollection` by using `Min.doublesGlobally()`
+
+```
+PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+PCollection<Double> min = input.apply(Min.doublesGlobally());
+```
+
+Output
+
+```
+1
+```
+
+You can use `Min.integersPerKey()` to calculate the minimum Integer associated with each unique key (which is of type String).
+
+```
+PCollection<KV<String, Integer>> input = pipeline.apply(
+    Create.of(KV.of("πŸ₯•", 3),
+              KV.of("πŸ₯•", 2),
+              KV.of("πŸ†", 1),
+              KV.of("πŸ…", 4),
+              KV.of("πŸ…", 5),
+              KV.of("πŸ…", 3)));
+PCollection<KV<String, Integer>> minPerKey = input.apply(Min.integersPerKey());
+```
+
+Output
+
+```
+KV{πŸ†, 1}
+KV{πŸ₯•, 2}
+KV{πŸ…, 3}
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+### Minimum element in a PCollection
+
+You can use `CombineGlobally(lambda elements: min(elements or [-1]))` to get the minimum element from the entire `PCollection`.
+
+```
+import apache_beam as beam
+
+with beam.Pipeline() as p:
+  min_element = (
+      p

Review Comment:
   (p | [first transform]
   on one line



##########
learning/tour-of-beam/learning-content/common-transforms/aggregation/min/python-example/task.py:
##########
@@ -0,0 +1,53 @@
+#   Licensed to the Apache Software Foundation (ASF) under one
+#   or more contributor license agreements.  See the NOTICE file
+#   distributed with this work for additional information
+#   regarding copyright ownership.  The ASF licenses this file
+#   to you under the Apache License, Version 2.0 (the
+#   "License"); you may not use this file except in compliance
+#   with the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# beam-playground:
+#   name: Min
+#   description: Min example.
+#   multifile: false
+#   context_line: 48
+#   categories:
+#     - Quickstart
+#   complexity: BASIC
+#   tags:
+#     - hellobeam
+
+
+import apache_beam as beam
+
+# Output PCollection
+class Output(beam.PTransform):
+    class _OutputFn(beam.DoFn):
+        def __init__(self, prefix=''):
+            super().__init__()
+            self.prefix = prefix
+
+        def process(self, element):
+            print(self.prefix+str(element))
+
+    def __init__(self, label=None,prefix=''):
+        super().__init__(label)
+        self.prefix = prefix
+
+    def expand(self, input):
+        input | beam.ParDo(self._OutputFn(self.prefix))
+
+with beam.Pipeline() as p:
+
+  (p | beam.Create(range(1, 11))
+   # beam.combiners.Top.Smallest(5) to return the small number than 5 from `PCollection`.

Review Comment:
   Not correct. Should be:
   beam.combiners.Top.Smallest(5) to return the 5 smallest numbers from the `PCollection`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org