You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2023/05/15 13:29:10 UTC

[beam] branch master updated: [Tour of Beam] Learning content for "Schema-based Transforms" module (#25256)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c8a4f5b6654 [Tour of Beam] Learning content for "Schema-based Transforms" module (#25256)
c8a4f5b6654 is described below

commit c8a4f5b66540be519fe225441993c9f1c5049c01
Author: Abzal Tuganbay <ab...@gmail.com>
AuthorDate: Mon May 15 19:28:59 2023 +0600

    [Tour of Beam] Learning content for "Schema-based Transforms" module (#25256)
    
    * add new format for schema-based
    
    * minor formatting
    
    * correct examples
    
    * correct examples
    
    * fix whitespace warning
    
    * missed whitespace warning
    
    * minor fix
    
    * correct examples
    
    * add Serializable
    
    * add import
    
    * test coder
    
    * fixing merge error
    
    * test coder
    
    * add coder
    
    * add coders
    
    * change to StringCoder
    
    * check rename
    
    * converting data
    
    * testing another schema transforms
    
    * testing another schema transforms
    
    * test toString
    
    * print all row
    
    * change logic
    
    * fix examples
    
    * add coders and fix
    
    ---------
    
    Co-authored-by: oborysevych <ol...@akvelon.com>
    Co-authored-by: mende1esmende1es <me...@gmail.cp>
---
 .../learning-content/content-info.yaml             |   1 +
 .../creating-pipeline/description.md               |   2 +-
 .../co-group/description.md                        | 107 +++++++++
 .../co-group/java-example/Task.java                | 213 +++++++++++++++++
 .../co-group/unit-info.yaml}                       |  12 +-
 .../schema-based-transforms/coder/description.md   | 153 ++++++++++++
 .../coder/java-example/Task.java                   | 183 +++++++++++++++
 .../coder/unit-info.yaml}                          |  12 +-
 .../schema-based-transforms/convert/description.md |  35 +++
 .../convert/java-example/Task.java                 | 230 ++++++++++++++++++
 .../convert/unit-info.yaml}                        |  12 +-
 .../schema-based-transforms/filter/description.md  |  74 ++++++
 .../filter/java-example/Task.java                  | 212 +++++++++++++++++
 .../filter/unit-info.yaml}                         |  12 +-
 .../schema-based-transforms/group/description.md   |  65 ++++++
 .../group/java-example/Task.java                   | 210 +++++++++++++++++
 .../group/unit-info.yaml}                          |  12 +-
 .../schema-based-transforms/join/description.md    |  51 ++++
 .../join/java-example/Task.java                    | 236 +++++++++++++++++++
 .../join/unit-info.yaml}                           |  12 +-
 .../module-info.yaml}                              |  21 +-
 .../motivating-challenge/description.md            |  39 ++++
 .../motivating-challenge/hint1.md                  |  24 ++
 .../motivating-challenge/java-challenge/Task.java  | 225 ++++++++++++++++++
 .../motivating-challenge/java-solution/Task.java   | 259 +++++++++++++++++++++
 .../motivating-challenge/unit-info.yaml}           |  12 +-
 .../schema-based-transforms/rename/description.md  |  36 +++
 .../rename/java-example/Task.java                  | 218 +++++++++++++++++
 .../rename/unit-info.yaml}                         |  12 +-
 .../schema-concept/creating-schema/description.md  | 197 ++++++++++++++++
 .../creating-schema/java-example/Task.java         | 181 ++++++++++++++
 .../schema-concept/creating-schema/unit-info.yaml} |  12 +-
 .../schema-concept/group-info.yaml}                |  13 +-
 .../schema-concept/logical-type/description.md     | 104 +++++++++
 .../schema-concept/logical-type/unit-info.yaml}    |  12 +-
 .../schema-based-transforms/select/description.md  | 125 ++++++++++
 .../select/java-example/Task.java                  | 199 ++++++++++++++++
 .../select/unit-info.yaml}                         |  12 +-
 38 files changed, 3442 insertions(+), 103 deletions(-)

diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/content-info.yaml
index 27bebdf26d6..8f1adc4703c 100644
--- a/learning/tour-of-beam/learning-content/content-info.yaml
+++ b/learning/tour-of-beam/learning-content/content-info.yaml
@@ -25,5 +25,6 @@ content:
   - introduction
   - common-transforms
   - core-transforms
+  - schema-based-transforms
   - windowing
   - triggers
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/description.md b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/description.md
index 152a89d9090..991f8d4afaa 100644
--- a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/description.md
+++ b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/description.md
@@ -39,7 +39,7 @@ Pipeline p = Pipeline.create(options);
 ```
 import apache_beam as beam
 
-with beam.Pipeline() as pipeline:
+with beam.Pipeline() as p:
   pass  # build your pipeline here
 ```
 {{end}}
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/co-group/description.md b/learning/tour-of-beam/learning-content/schema-based-transforms/co-group/description.md
new file mode 100644
index 00000000000..41cb1a0f46f
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/co-group/description.md
@@ -0,0 +1,107 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# CoGroup
+
+A transform that performs equijoins across multiple schema PCollections.
+
+This transform has similarities to `CoGroupByKey`, however works on PCollections that have schemas. This allows users of the transform to simply specify schema fields to join on. The output type of the transform is Row that contains one row field for the key and an ITERABLE field for each input containing the rows that joined on that key; by default the cross product is not expanded, but the cross product can be optionally expanded. By default the key field is named "key" (the name can b [...]
+
+For example, the following demonstrates joining three PCollections on the "user" and "country" fields:
+
+
+```
+PCollection<Row> input = PCollectionTuple.of("input1", input1, "input2", input2, "input3", input3)
+    .apply(CoGroup.join(By.fieldNames("user", "country")));
+```
+
+### JOIN DIFFERENT FIELDS
+
+It's also possible to join between different fields in two inputs, as long as the types of those fields match. In this case, fields must be specified for every input PCollection. For example:
+
+For example, consider the SQL join: `SELECT * FROM input1Tag JOIN input2Tag ON input1Tag.referringUser = input2Tag.user`
+
+```
+PCollection input = PCollectionTuple.of("input1Tag", input1, "input2Tag", input2)
+   .apply(CoGroup
+     .join("input1Tag", By.fieldNames("referringUser")))
+     .join("input2Tag", By.fieldNames("user")));
+```
+
+
+### INNER JOIN
+
+For example, consider the SQL join: `SELECT * FROM input1 INNER JOIN input2 ON input1.user = input2.user`
+
+```
+PCollection input = PCollectionTuple.of("input1", input1, "input2", input2)
+   .apply(CoGroup.join(By.fieldNames("user")).crossProductJoin();
+```
+
+### LEFT OUTER JOIN
+
+For example, consider the SQL join: `SELECT * FROM input1 LEFT OUTER JOIN input2 ON input1.user = input2.user`
+
+```
+PCollection input = PCollectionTuple.of("input1", input1, "input2", input2)
+   .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation())
+                 .join("input2", By.fieldNames("user"))
+                 .crossProductJoin();
+```
+
+### RIGHT OUTER JOIN
+
+For example, consider the SQL join: `SELECT * FROM input1 RIGHT OUTER JOIN input2 ON input1.user = input2.user`
+
+```
+PCollection input = PCollectionTuple.of("input1", input1, "input2", input2)
+   .apply(CoGroup.join("input1", By.fieldNames("user"))
+                 .join("input2", By.fieldNames("user").withOptionalParticipation())
+                 .crossProductJoin();
+```
+
+### FULL OUTER JOIN
+
+For example, consider the SQL join: `SELECT * FROM input1 FULL OUTER JOIN input2 ON input1.user = input2.user`
+
+```
+PCollection input = PCollectionTuple.of("input1", input1, "input2", input2)
+   .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation())
+                 .join("input2", By.fieldNames("user").withOptionalParticipation())
+                 .crossProductJoin();
+```
+
+### Playground exercise
+
+In the playground window you can find examples of using the `CoGroup`. By running this example, you will see user statistics in certain games.
+
+You can combine several classes
+
+Can you add your **own class** `UserDetails` to have **firstName** field and **lastName**:
+
+```
+public static class UserDetails {
+        public String userId;
+        public String userFirstName;
+        public String userLastName;
+}
+```
+
+You can combine more than two tables:
+```
+PCollection<UserDetails> details = pipeline.apply(Create.of(new UserDetails("userId","first","last")))
+PCollection<Row> coGroupPCollection =
+                PCollectionTuple.of("user", userInfo).and("game", gameInfo).and("details", details)
+                        .apply(CoGroup.join(CoGroup.By.fieldNames("userId")));
+```
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/co-group/java-example/Task.java b/learning/tour-of-beam/learning-content/schema-based-transforms/co-group/java-example/Task.java
new file mode 100644
index 00000000000..d7807986c3c
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/co-group/java-example/Task.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: co-group
+//   description: CoGroup example.
+//   multifile: false
+//   context_line: 128
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.transforms.CoGroup;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.util.StreamUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class Game {
+        public String userId;
+        public Integer score;
+        public String gameId;
+        public String date;
+
+        @SchemaCreate
+        public Game(String userId, Integer score, String gameId, String date) {
+            this.userId = userId;
+            this.score = score;
+            this.gameId = gameId;
+            this.date = date;
+        }
+
+        @Override
+        public String toString() {
+            return "Game{" +
+                    "userId='" + userId + '\'' +
+                    ", score='" + score + '\'' +
+                    ", gameId='" + gameId + '\'' +
+                    ", date='" + date + '\'' +
+                    '}';
+        }
+    }
+
+    // User schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class User {
+
+        public String userId;
+        public String userName;
+
+        @SchemaCreate
+        public User(String userId, String userName) {
+            this.userId = userId;
+            this.userName = userName;
+        }
+
+        @Override
+        public String toString() {
+            return "User{" +
+                    "userId='" + userId + '\'' +
+                    ", userName='" + userName + '\'' +
+                    '}';
+        }
+    }
+
+
+    public static void main(String[] args) {
+        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        Schema userSchema = Schema.builder()
+                .addStringField("userId")
+                .addStringField("userName")
+                .build();
+
+        Schema gameSchema = Schema.builder()
+                .addStringField("userId")
+                .addInt32Field("score")
+                .addStringField("gameId")
+                .addStringField("date")
+                .build();
+
+        Schema totalSchema = Schema.builder()
+                .addRowField("key", Schema.builder().addStringField("userId").build())
+                .addArrayField("game", Schema.FieldType.row(gameSchema))
+                .addArrayField("user", Schema.FieldType.row(userSchema))
+                .build();
+
+        PCollection<User> userInfo = getUserPCollection(pipeline);
+
+        PCollection<User> userRows = userInfo
+                .setSchema(userSchema, TypeDescriptor.of(User.class), row ->
+                        {
+                            User user = row;
+                            return Row.withSchema(userSchema)
+                                    .addValues(user.userId, user.userName)
+                                    .build();
+                        },
+                        row -> new User(row.getString(0), row.getString(1))
+                );
+
+        PCollection<Game> gameInfo = getGamePCollection(pipeline);
+
+        PCollection<Game> gameRows = gameInfo
+                .setSchema(gameSchema,
+                        TypeDescriptor.of(Game.class), row ->
+                        {
+                            Game game = row;
+                            return Row.withSchema(gameSchema)
+                                    .addValues(game.userId, game.score, game.gameId, game.date)
+                                    .build();
+                        },
+                        row -> new Game(row.getString(0), row.getInt32(1), row.getString(2), row.getString(3)));
+
+        PCollection<Row> coGroupPCollection =
+                PCollectionTuple.of("user", userRows).and("game", gameRows)
+                        .apply(CoGroup.join(CoGroup.By.fieldNames("userId")));
+
+        coGroupPCollection
+                .setRowSchema(totalSchema)
+                .setCoder(RowCoder.of(totalSchema))
+                .apply("User flatten row", ParDo.of(new LogOutput<>("Flattened")));
+
+        pipeline.run();
+    }
+
+    public static PCollection<User> getUserPCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(100);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractUserFn()));
+    }
+
+    public static PCollection<Game> getGamePCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(100);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractUserProgressFn()));
+    }
+
+    static class ExtractUserFn extends DoFn<String, User> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new User(items[0], items[1]));
+        }
+    }
+
+    static class ExtractUserProgressFn extends DoFn<String, Game> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new Game(items[0], Integer.valueOf(items[2]), items[3], items[4]));
+        }
+    }
+
+    static class LogOutput<T> extends DoFn<T, T> {
+
+        private final String prefix;
+
+        LogOutput() {
+            this.prefix = "Processing element";
+        }
+
+        LogOutput(String prefix) {
+            this.prefix = prefix;
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            LOG.info(prefix + ": {}", c.element());
+        }
+    }
+}
diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/schema-based-transforms/co-group/unit-info.yaml
similarity index 87%
copy from learning/tour-of-beam/learning-content/content-info.yaml
copy to learning/tour-of-beam/learning-content/schema-based-transforms/co-group/unit-info.yaml
index 27bebdf26d6..0a322acb474 100644
--- a/learning/tour-of-beam/learning-content/content-info.yaml
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/co-group/unit-info.yaml
@@ -19,11 +19,7 @@
 
 sdk:
   - Java
-  - Python
-  - Go
-content:
-  - introduction
-  - common-transforms
-  - core-transforms
-  - windowing
-  - triggers
\ No newline at end of file
+id: co-group
+name: CoGroup
+taskName: co-group
+complexity: ADVANCED
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/coder/description.md b/learning/tour-of-beam/learning-content/schema-based-transforms/coder/description.md
new file mode 100644
index 00000000000..cf1efa3e481
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/coder/description.md
@@ -0,0 +1,153 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Coder
+
+A `Coder<T>` defines how to encode and decode values of type `T` into byte streams.
+
+> Note that coders are unrelated to parsing or formatting data when interacting with external data sources or sinks. You need to do such parsing or formatting explicitly, using transforms such as `ParDo` or `MapElements`.
+
+The Beam SDK requires a coder for every PCollection in your pipeline. In many cases, Beam can automatically infer the Coder for type in `PCollection` and use predefined coders to perform encoding and decoding. However, in some cases, you will need to specify the Coder explicitly or create a Coder for custom types.
+
+To set the `Coder` for `PCollection`, you need to call `PCollection.setCoder`. You can also get the Coder associated with PCollection using the `PCollection.getCoder` method.
+
+### CoderRegistry
+
+When Beam tries to infer Coder for `PCollection`, it uses mappings stored in the `CoderRegistry` object associated with `PCollection`. You can access the `CoderRegistry` for a given pipeline using the method `Pipeline.getCoderRegistry` or get a coder for a particular type using `CoderRegistry.getCoder`.
+
+Please note that since `CoderRegistry` is associated with each `PCollection`, you can encode\decode the same type differently in different `PCollection`.
+
+The following example demonstrates how to register a coder for a type using `CoderRegistry`:
+
+```
+PipelineOptions options = PipelineOptionsFactory.create();
+Pipeline pipeline = Pipeline.create(options);
+
+CoderRegistry cr = pipeline.getCoderRegistry();
+cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);
+```
+
+### Specifying default coder for a type
+
+You can specify the default coder for your custom type by annotating it with `@defaultcoder` annotation. For example:
+```
+@DefaultCoder(AvroCoder.class)
+public class MyCustomDataType {
+  ...
+}
+```
+
+
+`Coder` classes for compound types are often composed of coder classes for types contains therein. The composition of `Coder` instances into a coder for the compound class is the subject of the `Coder` Provider type, which enables automatic generic composition of Coder classes within the CoderRegistry. See `Coder` Provider and `CoderRegistry` for more information about how coders are inferred.
+
+When you create custom objects and schemas, you need to create a subclass of Coder for your object and implement the following methods:
+* `encode` - converting objects to bytes
+* `decode` - converting bytes to objects
+* `getCoderArguments` - If it is a `Coder` for a parameterized type, returns a list of `Coders` used for each of the parameters, in the same order in which they appear in the type signature of the parameterized type.
+* `verifyDeterministic` - throw the `Coder.NonDeterministicException`, if the encoding is not deterministic.
+
+When you get the data and when you paint it as a structure, you will need a `dto` class. In this case, `VendorToPassengerDTO`:
+
+```
+@DefaultSchema(JavaFieldSchema.class)
+class VendorToPassengerDTO {
+
+    @JsonProperty(value = "PassengerCount")
+    Integer PassengerCount;
+
+    @JsonProperty(value = "VendorID")
+    Integer VendorID;
+
+    @SchemaCreate
+    public VendorToPassengerDTO(Integer passengerCount, Integer vendorID) {
+        this.PassengerCount = passengerCount;
+        this.VendorID = vendorID;
+    }
+
+    // Function for TypeDescription
+    public static VendorToPassengerDTO of(final Integer passengerCount, final Integer vendorID) {
+        return new VendorToPassengerDTO(passengerCount, vendorID);
+    }
+
+    // Setter
+    // Getter
+    // ToString
+}
+```
+
+`Pipeline` can't use select, group, and so on, because it doesn't understand the data structure, so we need to write our own `Coder`:
+
+```
+class CustomCoderSecond extends Coder<VendorToPassengerDTO> {
+    final ObjectMapper objectMapper = new ObjectMapper();
+
+    private static final CustomCoderSecond INSTANCE = new CustomCoderSecond();
+
+    public static CustomCoderSecond of() {
+        return INSTANCE;
+    }
+
+    @Override
+    public void encode(VendorToPassengerDTO dto, OutputStream outStream) throws IOException {
+        final String result = dto.toString();
+        outStream.write(result.getBytes());
+    }
+
+    @Override
+    public VendorToPassengerDTO decode(InputStream inStream) throws IOException {
+        final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream));
+        return objectMapper.readValue(serializedDTOs, VendorToPassengerDTO.class);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void verifyDeterministic() {
+    }
+}
+```
+
+### Playground exercise
+
+In the playground window you can find examples of using `Coder`. By running this example, you will see user statistics in certain games.
+
+You can add a new `winner` field:
+```
+public static class Game {
+        public String userId;
+        public Integer score;
+        public String gameId;
+        public String date;
+        public Boolean winner;
+}
+```
+
+If the `score` is more than **10**, put the `true` in the `Coder`.
+
+```
+String line = user.userId + "," + user.userName + ";" + user.game.score + "," + user.game.gameId + "," + user.game.gameId + "," + user.winner;
+```
+
+```
+String[] game = params[1].split(",");
+int score = game[0];
+boolean winner = false;
+if(score >= 10){
+    winner = true;
+}
+return new User(user[0], user[1], new Game(user[0], Integer.valueOf(game[0]), game[1], game[2],winner));
+```
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/coder/java-example/Task.java b/learning/tour-of-beam/learning-content/schema-based-transforms/coder/java-example/Task.java
new file mode 100644
index 00000000000..b5d4cd73959
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/coder/java-example/Task.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: coder
+//   description: Coder example.
+//   multifile: false
+//   context_line: 139
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.util.StreamUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Objects;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.io.Serializable;
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class Game {
+        public String userId;
+        public Integer score;
+        public String gameId;
+        public String date;
+
+        @SchemaCreate
+        public Game(String userId, Integer score, String gameId, String date) {
+            this.userId = userId;
+            this.score = score;
+            this.gameId = gameId;
+            this.date = date;
+        }
+
+        @Override
+        public String toString() {
+            return "Game{" +
+                    "userId='" + userId + '\'' +
+                    ", score='" + score + '\'' +
+                    ", gameId='" + gameId + '\'' +
+                    ", date='" + date + '\'' +
+                    '}';
+        }
+    }
+
+    // User schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class User {
+        public String userId;
+        public String userName;
+
+        public Game game;
+
+        @SchemaCreate
+        public User(String userId, String userName, Game game) {
+            this.userId = userId;
+            this.userName = userName;
+            this.game = game;
+        }
+
+        @Override
+        public String toString() {
+            return "User{" +
+                    "userId='" + userId + '\'' +
+                    ", userName='" + userName + '\'' +
+                    ", game=" + game +
+                    '}';
+        }
+    }
+
+    public static void main(String[] args) {
+        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        PCollection<User> input = getProgressPCollection(pipeline);
+
+        input
+                .setCoder(CustomCoder.of())
+                .apply("User", ParDo.of(new LogOutput<>("User row")));
+
+        pipeline.run();
+    }
+
+    public static PCollection<User> getProgressPCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(10);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractGameStatisticsFn()));
+    }
+
+    static class ExtractGameStatisticsFn extends DoFn<String, User> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new User(items[0], items[1], new Game(items[0], Integer.valueOf(items[2]), items[3], items[4])));
+        }
+    }
+
+    static class CustomCoder extends Coder<User> {
+        private static final CustomCoder INSTANCE = new CustomCoder();
+
+        public static CustomCoder of() {
+            return INSTANCE;
+        }
+
+        @Override
+        public void encode(User user, OutputStream outStream) throws IOException {
+            String line = user.userId + "," + user.userName + ";" + user.game.score + "," + user.game.gameId + "," + user.game.date;
+            outStream.write(line.getBytes());
+        }
+
+        @Override
+        public User decode(InputStream inStream) throws IOException {
+            final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream));
+            String[] params = serializedDTOs.split(";");
+            String[] user = params[0].split(",");
+            String[] game = params[1].split(",");
+            return new User(user[0], user[1], new Game(user[0], Integer.valueOf(game[0]), game[1], game[2]));
+        }
+
+        @Override
+        public List<? extends Coder<?>> getCoderArguments() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void verifyDeterministic() {
+        }
+    }
+
+    static class LogOutput<T> extends DoFn<T, T> {
+
+        private final String prefix;
+
+        LogOutput() {
+            this.prefix = "Processing element";
+        }
+
+        LogOutput(String prefix) {
+            this.prefix = prefix;
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            LOG.info(prefix + ": {}", c.element());
+        }
+    }
+}
diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/schema-based-transforms/coder/unit-info.yaml
similarity index 87%
copy from learning/tour-of-beam/learning-content/content-info.yaml
copy to learning/tour-of-beam/learning-content/schema-based-transforms/coder/unit-info.yaml
index 27bebdf26d6..ab91acd0ded 100644
--- a/learning/tour-of-beam/learning-content/content-info.yaml
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/coder/unit-info.yaml
@@ -19,11 +19,7 @@
 
 sdk:
   - Java
-  - Python
-  - Go
-content:
-  - introduction
-  - common-transforms
-  - core-transforms
-  - windowing
-  - triggers
\ No newline at end of file
+id: coder
+name: Coder
+taskName: coder
+complexity: ADVANCED
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/convert/description.md b/learning/tour-of-beam/learning-content/schema-based-transforms/convert/description.md
new file mode 100644
index 00000000000..07bfab4249b
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/convert/description.md
@@ -0,0 +1,35 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### Converting between types
+
+As mentioned, Beam can automatically convert between different Java types, as long as those types have equivalent schemas. One way to do this is by using the ```Convert``` transform, as follows.
+
+```
+PCollection<Object> input = pipeline.apply(Create.of(user1));
+
+// Object convert to Row
+PCollection<Row> convertedToRow = input.apply(Convert.toRows());
+```
+
+### Playground exercise
+
+In the playground window you can find examples of using the `Convert`. By running this example, you will see user statistics in certain games.
+You can add schema with one function:
+
+```
+PCollection<Row> userRow = fullStatistics
+                .apply(Convert.toRows())
+                .setRowSchema(type)
+                .apply("User", ParDo.of(new LogOutput<>("ToRows")));
+```
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/convert/java-example/Task.java b/learning/tour-of-beam/learning-content/schema-based-transforms/convert/java-example/Task.java
new file mode 100644
index 00000000000..90103e435f7
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/convert/java-example/Task.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: convert
+//   description: Convert example.
+//   multifile: false
+//   context_line: 133
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.transforms.RenameFields;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.util.StreamUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.io.Serializable;
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class Game {
+        public String userId;
+        public Integer score;
+        public String gameId;
+        public String date;
+
+        @SchemaCreate
+        public Game(String userId, Integer score, String gameId, String date) {
+            this.userId = userId;
+            this.score = score;
+            this.gameId = gameId;
+            this.date = date;
+        }
+
+        @Override
+        public String toString() {
+            return "Game{" +
+                    "userId='" + userId + '\'' +
+                    ", score='" + score + '\'' +
+                    ", gameId='" + gameId + '\'' +
+                    ", date='" + date + '\'' +
+                    '}';
+        }
+    }
+
+    // User schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class User {
+
+        public String userId;
+        public String userName;
+        public Game game;
+
+        @SchemaCreate
+        public User(String userId, String userName, Game game) {
+            this.userId = userId;
+            this.userName = userName;
+            this.game = game;
+        }
+
+        @Override
+        public String toString() {
+            return "User{" +
+                    "userId='" + userId + '\'' +
+                    ", userName='" + userName + '\'' +
+                    ", game=" + game +
+                    '}';
+        }
+    }
+
+
+    public static void main(String[] args) {
+        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        PCollection<User> input = getProgressPCollection(pipeline);
+
+        Schema gameSchema = Schema.builder()
+                .addStringField("userId")
+                .addInt32Field("score")
+                .addStringField("gameId")
+                .addStringField("date")
+                .build();
+
+        Schema schema = Schema.builder()
+                .addStringField("userId")
+                .addStringField("userName")
+                .addRowField("game", gameSchema)
+                .build();
+
+        PCollection<Row> pCollection = input
+                .setSchema(schema,
+                        TypeDescriptor.of(User.class), user ->
+                        {
+                            Game game = user.game;
+
+                            Row gameRow = Row.withSchema(gameSchema)
+                                    .addValues(game.userId, game.score, game.gameId, game.date)
+                                    .build();
+
+                            return Row.withSchema(schema)
+                                    .addValues(user.userId, user.userName, gameRow).build();
+                        },
+                        row -> {
+                            String userId = row.getValue("userId");
+                            String userName = row.getValue("userName");
+                            Row game = row.getValue("game");
+
+                            String gameId = game.getValue("gameId");
+                            Integer gameScore = game.getValue("score");
+                            String gameDate = game.getValue("date");
+                            return new User(userId, userName, new Game(userId, gameScore, gameId, gameDate));
+                        })
+                .apply(Convert.to(Row.class))
+                .setCoder(RowCoder.of(schema));
+
+        pCollection
+                .apply("User", ParDo.of(new LogOutput<>("Convert to Result")));
+
+        pipeline.run();
+    }
+    public static PCollection<User> getProgressPCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(10);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractUserProgressFn())).setCoder(CustomCoder.of());
+    }
+
+    static class ExtractUserProgressFn extends DoFn<String, User> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new User(items[0], items[1], new Game(items[0], Integer.valueOf(items[2]), items[3], items[4])
+            ));
+        }
+    }
+
+    static class CustomCoder extends Coder<User> {
+        private static final CustomCoder INSTANCE = new CustomCoder();
+
+        public static CustomCoder of() {
+            return INSTANCE;
+        }
+
+        @Override
+        public void encode(User user, OutputStream outStream) throws IOException {
+            String line = user.userId + "," + user.userName + ";" + user.game.score + "," + user.game.gameId + "," + user.game.date;
+            outStream.write(line.getBytes());
+        }
+
+        @Override
+        public User decode(InputStream inStream) throws IOException {
+            final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream));
+            String[] params = serializedDTOs.split(";");
+            String[] user = params[0].split(",");
+            String[] game = params[1].split(",");
+            return new User(user[0], user[1], new Game(user[0], Integer.valueOf(game[0]), game[1], game[2]));
+        }
+
+        @Override
+        public List<? extends Coder<?>> getCoderArguments() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void verifyDeterministic() {
+        }
+    }
+
+    static class LogOutput<T> extends DoFn<T, T> {
+
+        private final String prefix;
+
+        LogOutput() {
+            this.prefix = "Processing element";
+        }
+
+        LogOutput(String prefix) {
+            this.prefix = prefix;
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            LOG.info(prefix + ": {}", c.element());
+        }
+    }
+}
diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/schema-based-transforms/convert/unit-info.yaml
similarity index 87%
copy from learning/tour-of-beam/learning-content/content-info.yaml
copy to learning/tour-of-beam/learning-content/schema-based-transforms/convert/unit-info.yaml
index 27bebdf26d6..3f57601f09a 100644
--- a/learning/tour-of-beam/learning-content/content-info.yaml
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/convert/unit-info.yaml
@@ -19,11 +19,7 @@
 
 sdk:
   - Java
-  - Python
-  - Go
-content:
-  - introduction
-  - common-transforms
-  - core-transforms
-  - windowing
-  - triggers
\ No newline at end of file
+id: convert
+name: Convert
+taskName: convert
+complexity: ADVANCED
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/filter/description.md b/learning/tour-of-beam/learning-content/schema-based-transforms/filter/description.md
new file mode 100644
index 00000000000..22b581b661b
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/filter/description.md
@@ -0,0 +1,74 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Filter
+
+A `PTransform` for filtering a collection of schema types.
+
+Separate Predicates can be registered for different schema fields, and the result is allowed to pass if all predicates return true. The output type is the same as the input type.
+
+### Single fields filter
+
+For example, consider the following schema type:
+
+```
+public class Location {
+   public double latitude;
+   public double longitude;
+}
+```
+
+In order to examine only locations in south Manhattan, you would write:
+
+```
+PCollection<Location> locations = readLocations();
+locations.apply(Filter
+   .whereFieldName("latitude", latitude -> latitude < 40.720 && latitude > 40.699)
+   .whereFieldName("longitude", longitude -> longitude < -73.969 && longitude > -74.747));
+```
+
+### Multiple fields filter
+
+You can also use multiple fields inside the filtering predicate. For example, consider the following schema type representing user account:
+
+```
+class UserAccount {
+   public double spendOnBooks;
+   public double spendOnMovies;
+   ...
+}
+```
+
+Let's say you'd like to process only users who's total spend is over $100. You could write:
+
+```
+PCollection<UserAccount> input = readUsers();
+input.apply(Filter
+    .whereFieldNames(Lists.newArrayList("spendOnBooks", "spendOnMovies"),
+        row -> return row.getDouble("spendOnBooks") + row.getDouble("spendOnMovies") > 100.00));
+```
+
+### Playground exercise
+
+In the playground window you can find examples of using the `Filter` button.
+By running this example, you will see user statistics in certain games.
+You can do by multiple fields:
+```
+.apply(Filter.create().whereFieldNames(Arrays.asList("userId", "score"), new SerializableFunction<Row, Boolean>() {
+                    @Override
+                    public Boolean apply(Row input) {
+                        return input.getString("userId").toLowerCase().startsWith("a") || input.getInt32("score") > 10;
+                    }
+}))
+```
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/filter/java-example/Task.java b/learning/tour-of-beam/learning-content/schema-based-transforms/filter/java-example/Task.java
new file mode 100644
index 00000000000..53ab7cb1e69
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/filter/java-example/Task.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: schema-filter
+//   description: Schema filter example.
+//   multifile: false
+//   context_line: 130
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.transforms.Filter;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.util.StreamUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.io.Serializable;
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class Game {
+        public String userId;
+        public Integer score;
+        public String gameId;
+        public String date;
+
+        @SchemaCreate
+        public Game(String userId, Integer score, String gameId, String date) {
+            this.userId = userId;
+            this.score = score;
+            this.gameId = gameId;
+            this.date = date;
+        }
+
+        @Override
+        public String toString() {
+            return "Game{" +
+                    "userId='" + userId + '\'' +
+                    ", score='" + score + '\'' +
+                    ", gameId='" + gameId + '\'' +
+                    ", date='" + date + '\'' +
+                    '}';
+        }
+    }
+
+    // User schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class User {
+
+        public String userId;
+        public String userName;
+        public Game game;
+
+        @SchemaCreate
+        public User(String userId, String userName, Game game) {
+            this.userId = userId;
+            this.userName = userName;
+            this.game = game;
+        }
+
+        @Override
+        public String toString() {
+            return "User{" +
+                    "userId='" + userId + '\'' +
+                    ", userName='" + userName + '\'' +
+                    ", game=" + game +
+                    '}';
+        }
+    }
+
+    public static void main(String[] args) {
+        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        PCollection<User> input = getProgressPCollection(pipeline).setCoder(CustomCoder.of());
+
+        Schema type = Schema.builder()
+                .addStringField("userId")
+                .addStringField("userName")
+                .addInt32Field("score")
+                .addStringField("gameId")
+                .addStringField("date")
+                .build();
+
+        PCollection<User> pCollection = input
+                .apply(MapElements.into(TypeDescriptor.of(Object.class)).via(it -> it))
+                .setSchema(type,
+                        TypeDescriptor.of(Object.class), row ->
+                        {
+                            User user = (User) row;
+                            return Row.withSchema(type)
+                                    .addValues(user.userId, user.userName, user.game.score, user.game.gameId, user.game.date)
+                                    .build();
+                        },
+                        row -> new User(row.getString(0), row.getString(1),
+                                new Game(row.getString(0), row.getInt32(2), row.getString(3), row.getString(4)))
+                )
+                .apply(Filter.create().whereFieldName("score", score -> (int) score > 11))
+                .apply(MapElements.into(TypeDescriptor.of(User.class)).via(user -> (User) user));
+
+        pCollection
+                .setCoder(CustomCoder.of())
+                .apply("User", ParDo.of(new LogOutput<>("Filtered")));
+
+        pipeline.run();
+    }
+
+    public static PCollection<User> getProgressPCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(100);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractUserProgressFn()));
+    }
+
+    static class ExtractUserProgressFn extends DoFn<String, User> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new User(items[0], items[1], new Game(items[0], Integer.valueOf(items[2]), items[3], items[4])
+            ));
+        }
+    }
+
+    static class CustomCoder extends Coder<User> {
+        private static final CustomCoder INSTANCE = new CustomCoder();
+
+        public static CustomCoder of() {
+            return INSTANCE;
+        }
+
+        @Override
+        public void encode(User user, OutputStream outStream) throws IOException {
+            String line = user.userId + "," + user.userName + ";" + user.game.score + "," + user.game.gameId + "," + user.game.date;
+            outStream.write(line.getBytes());
+        }
+
+        @Override
+        public User decode(InputStream inStream) throws IOException {
+            final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream));
+            String[] params = serializedDTOs.split(";");
+            String[] user = params[0].split(",");
+            String[] game = params[1].split(",");
+            return new User(user[0], user[1], new Game(user[0], Integer.valueOf(game[0]), game[1], game[2]));
+        }
+
+        @Override
+        public List<? extends Coder<?>> getCoderArguments() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void verifyDeterministic() {
+        }
+    }
+
+    static class LogOutput<T> extends DoFn<T, T> {
+
+        private final String prefix;
+
+        LogOutput() {
+            this.prefix = "Processing element";
+        }
+
+        LogOutput(String prefix) {
+            this.prefix = prefix;
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            LOG.info(prefix + ": {}", c.element());
+        }
+    }
+}
diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/schema-based-transforms/filter/unit-info.yaml
similarity index 87%
copy from learning/tour-of-beam/learning-content/content-info.yaml
copy to learning/tour-of-beam/learning-content/schema-based-transforms/filter/unit-info.yaml
index 27bebdf26d6..72c14249ea8 100644
--- a/learning/tour-of-beam/learning-content/content-info.yaml
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/filter/unit-info.yaml
@@ -19,11 +19,7 @@
 
 sdk:
   - Java
-  - Python
-  - Go
-content:
-  - introduction
-  - common-transforms
-  - core-transforms
-  - windowing
-  - triggers
\ No newline at end of file
+id: schema-filter
+name: Filter
+taskName: schema-filter
+complexity: ADVANCED
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/group/description.md b/learning/tour-of-beam/learning-content/schema-based-transforms/group/description.md
new file mode 100644
index 00000000000..95d185b7ea7
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/group/description.md
@@ -0,0 +1,65 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Group
+
+`Group` transform can be used to group records in `PCollection` by one or several fields in the input schema. You can also apply aggregations to those groupings, which is the most common use of the `Group` transform.
+
+The output of the `Group` transform has a schema with one field corresponding to each aggregation.
+
+When used without a combiner, this transforms simply acts as a `GroupByKey` except that you don't have to explicitly extract keys.
+
+For example, consider the following input schema:
+```
+public class UserPurchase {
+   public String userId;
+   public String country;
+   public long cost;
+   public double transactionDuration;
+ }
+```
+
+### Group by fields
+
+You can group all purchases by user and country as follows:
+
+```
+PCollection<Row> byUser = input.apply(Group.byFieldNames("userId', "country"));
+```
+
+### Group with aggregation
+
+You will likely be using grouping to aggregate input data. The builder methods inside the `Group` class allow the creation of separate aggregations for every field (or set of fields) on the input schema and generate an output schema based on these aggregations. For example:
+
+```
+PCollection<Row> aggregated = input
+     .apply(Group.byFieldNames("userId', "country")
+          .aggregateField("cost", Sum.ofLongs(), "total_cost")
+          .aggregateField("cost", Top.<Long>largestLongsFn(10), "top_purchases")
+          .aggregateField("cost", ApproximateQuantilesCombineFn.create(21),
+              Field.of("transactionDurations", FieldType.array(FieldType.INT64)));
+```
+
+The result will be a new row schema containing the fields **total_cost**, **top_purchases**, and **transactionDurations**, containing the sum of all purchases costs (for that user and country), the top ten purchases, and a histogram of transaction durations. The schema will also contain a key field, a row containing userId and country.
+
+> Note that usually, the field type can be automatically inferred from the `Combine.CombineFn` passed in. However, sometimes it cannot be inferred due to Java type erasure. In such case, you need to specify the field type using `Schema.Field`. In the above example, the type is explicitly specified for the `transactionDurations` field.
+
+### Playground exercise
+
+In the playground window you can find examples of using the `Group`. By running this example, you will see user statistics in certain games.
+
+Instead of `Sum`, you can use other `CombineFn` functions:
+```
+.apply(Group.byFieldNames("userName").aggregateField("score", Max.ofIntegers(), "total"))
+```
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/group/java-example/Task.java b/learning/tour-of-beam/learning-content/schema-based-transforms/group/java-example/Task.java
new file mode 100644
index 00000000000..0a628d27f20
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/group/java-example/Task.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: group
+//   description: Group example.
+//   multifile: false
+//   context_line: 130
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.transforms.Group;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.util.StreamUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class Game {
+        public String userId;
+        public Integer score;
+        public String gameId;
+        public String date;
+
+        @SchemaCreate
+        public Game(String userId, Integer score, String gameId, String date) {
+            this.userId = userId;
+            this.score = score;
+            this.gameId = gameId;
+            this.date = date;
+        }
+
+        @Override
+        public String toString() {
+            return "Game{" +
+                    "userId='" + userId + '\'' +
+                    ", score='" + score + '\'' +
+                    ", gameId='" + gameId + '\'' +
+                    ", date='" + date + '\'' +
+                    '}';
+        }
+    }
+
+    // User schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class User {
+
+        public String userId;
+        public String userName;
+        public Game game;
+
+        @SchemaCreate
+        public User(String userId, String userName, Game game) {
+            this.userId = userId;
+            this.userName = userName;
+            this.game = game;
+        }
+
+        @Override
+        public String toString() {
+            return "User{" +
+                    "userId='" + userId + '\'' +
+                    ", userName='" + userName + '\'' +
+                    ", game=" + game +
+                    '}';
+        }
+    }
+
+    public static void main(String[] args) {
+        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        PCollection<User> input = getProgressPCollection(pipeline);
+
+        Schema type = Schema.builder()
+                .addStringField("userId")
+                .addStringField("userName")
+                .addInt32Field("score")
+                .addStringField("gameId")
+                .addStringField("date")
+                .build();
+
+        PCollection<String> pCollection = input
+                .apply(MapElements.into(TypeDescriptor.of(Object.class)).via(it -> it))
+                .setSchema(type,
+                        TypeDescriptor.of(Object.class), row ->
+                        {
+                            User user = (User) row;
+                            return Row.withSchema(type)
+                                    .addValues(user.userId, user.userName, user.game.score, user.game.gameId, user.game.date)
+                                    .build();
+                        },
+                        row -> new User(row.getString(0), row.getString(1),
+                                new Game(row.getString(0), row.getInt32(2), row.getString(3), row.getString(4)))
+                )
+                .apply(Group.byFieldNames("userId").aggregateField("score", Sum.ofIntegers(), "total"))
+                .apply(MapElements.into(TypeDescriptor.of(String.class)).via(row -> row.getRow(0).getValue(0) + " : " + row.getRow(1).getValue(0)));
+
+        pCollection
+                .setCoder(StringUtf8Coder.of())
+                .apply("User flatten row", ParDo.of(new LogOutput<>("Flattened")));
+
+        pipeline.run();
+    }
+
+    public static PCollection<User> getProgressPCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(100);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractUserProgressFn())).setCoder(CustomCoder.of());
+    }
+
+    static class ExtractUserProgressFn extends DoFn<String, User> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new User(items[0], items[1], new Game(items[0], Integer.valueOf(items[2]), items[3], items[4])
+            ));
+        }
+    }
+
+    static class CustomCoder extends Coder<User> {
+        private static final CustomCoder INSTANCE = new CustomCoder();
+
+        public static CustomCoder of() {
+            return INSTANCE;
+        }
+
+        @Override
+        public void encode(User user, OutputStream outStream) throws IOException {
+            String line = user.userId + "," + user.userName + ";" + user.game.score + "," + user.game.gameId + "," + user.game.date;
+            outStream.write(line.getBytes());
+        }
+
+        @Override
+        public User decode(InputStream inStream) throws IOException {
+            final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream));
+            String[] params = serializedDTOs.split(";");
+            String[] user = params[0].split(",");
+            String[] game = params[1].split(",");
+            return new User(user[0], user[1], new Game(user[0], Integer.valueOf(game[0]), game[1], game[2]));
+        }
+
+        @Override
+        public List<? extends Coder<?>> getCoderArguments() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void verifyDeterministic() {
+        }
+    }
+
+    static class LogOutput<T> extends DoFn<T, T> {
+
+        private final String prefix;
+
+        LogOutput() {
+            this.prefix = "Processing element";
+        }
+
+        LogOutput(String prefix) {
+            this.prefix = prefix;
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            LOG.info(prefix + ": {}", c.element());
+        }
+    }
+}
diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/schema-based-transforms/group/unit-info.yaml
similarity index 87%
copy from learning/tour-of-beam/learning-content/content-info.yaml
copy to learning/tour-of-beam/learning-content/schema-based-transforms/group/unit-info.yaml
index 27bebdf26d6..b49d9e097ce 100644
--- a/learning/tour-of-beam/learning-content/content-info.yaml
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/group/unit-info.yaml
@@ -19,11 +19,7 @@
 
 sdk:
   - Java
-  - Python
-  - Go
-content:
-  - introduction
-  - common-transforms
-  - core-transforms
-  - windowing
-  - triggers
\ No newline at end of file
+id: group
+name: Group
+taskName: group
+complexity: ADVANCED
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/join/description.md b/learning/tour-of-beam/learning-content/schema-based-transforms/join/description.md
new file mode 100644
index 00000000000..b3db2afbd79
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/join/description.md
@@ -0,0 +1,51 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Joins
+
+A transform that performs equijoins across two schema PCollections.
+
+This transform allows joins between two input `PCollections` simply by specifying the fields to join on. The resulting `PCollection<Row>` will have two fields named "**lhs**" and "**rhs**" respectively, each with the schema of the corresponding input `PCollection`.
+
+For example, the following demonstrates joining two `PCollections` using a natural join on the "**user**" and "**country**" fields, where both the left-hand and the right-hand `PCollections` have fields with these names.
+
+```
+PCollection<Row> joined = input1.apply(Join.innerJoin(input2).using("user", "country"));
+```
+
+If the right-hand `PCollection` contains fields with different names to join against, you can specify them as follows:
+
+```
+PCollection<Row> joined = input1.apply(Join.innerJoin(input2)
+       .on(FieldsEqual.left("user", "country").right("otherUser", "otherCountry")));
+```
+
+### Supported methods
+
+* `Full outer join`
+* `Left outer join`
+* `Right outer join`
+* `Inner join`
+* `Left inner join`
+* `Right inner join`
+
+
+### Playground exercise
+
+In the playground window you can find examples of using the `Join`. By running this example, you will see user statistics in certain games.
+
+You can use other joins simply by changing the function name:
+```
+.apply(Join.fullOuterJoin(gameInfo).using("userId"));
+```
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/join/java-example/Task.java b/learning/tour-of-beam/learning-content/schema-based-transforms/join/java-example/Task.java
new file mode 100644
index 00000000000..ae6075b0500
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/join/java-example/Task.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: join
+//   description: Join example.
+//   multifile: false
+//   context_line: 129
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.transforms.Join;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.util.StreamUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class Game {
+        public String userId;
+        public Integer score;
+        public String gameId;
+        public String date;
+
+        @SchemaCreate
+        public Game(String userId, Integer score, String gameId, String date) {
+            this.userId = userId;
+            this.score = score;
+            this.gameId = gameId;
+            this.date = date;
+        }
+
+        @Override
+        public String toString() {
+            return "Game{" + "userId='" + userId + '\'' + ", score='" + score + '\'' + ", gameId='" + gameId + '\'' + ", date='" + date + '\'' + '}';
+        }
+    }
+
+    // User schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class User {
+        public String userId;
+        public String userName;
+
+        @SchemaCreate
+        public User(String userId, String userName) {
+            this.userId = userId;
+            this.userName = userName;
+        }
+
+        @Override
+        public String toString() {
+            return "User{" + "userId='" + userId + '\'' + ", userName='" + userName + '\'' + '}';
+        }
+    }
+
+
+    public static void main(String[] args) {
+        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        Schema userSchema = Schema.builder()
+                .addStringField("userId")
+                .addStringField("userName")
+                .build();
+
+        Schema gameSchema = Schema.builder()
+                .addStringField("userId")
+                .addInt32Field("score")
+                .addStringField("gameId")
+                .addStringField("date")
+                .build();
+
+        PCollection<User> userInfo = getUserPCollection(pipeline).setSchema(userSchema, TypeDescriptor.of(User.class), input -> {
+            User user = input;
+            return Row.withSchema(userSchema).addValues(user.userId, user.userName).build();
+        }, input -> new User(input.getString(0), input.getString(1)));
+
+        PCollection<Game> gameInfo = getGamePCollection(pipeline).setSchema(gameSchema, TypeDescriptor.of(Game.class), row -> {
+            Game game = row;
+            return Row.withSchema(gameSchema).addValues(game.userId, game.score, game.gameId, game.date).build();
+        }, row -> new Game(row.getString(0), row.getInt32(1), row.getString(2), row.getString(3)));
+
+        PCollection<Row> pCollection = userInfo.apply(Join.<User, Game>innerJoin(gameInfo).using("userId"));
+
+        pCollection.apply("User flatten row", ParDo.of(new LogOutput<>("Flattened")));
+
+        pipeline.run();
+    }
+
+
+    static class UserCoder extends Coder<Task.User> {
+        private static final UserCoder INSTANCE = new UserCoder();
+
+        public static UserCoder of() {
+            return INSTANCE;
+        }
+
+        @Override
+        public void encode(Task.User user, OutputStream outStream) throws IOException {
+            String line = user.userId + "," + user.userName;
+            outStream.write(line.getBytes());
+        }
+
+        @Override
+        public Task.User decode(InputStream inStream) throws IOException {
+            final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream));
+            String[] params = serializedDTOs.split(",");
+            return new Task.User(params[0], params[1]);
+        }
+
+        @Override
+        public List<? extends Coder<?>> getCoderArguments() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void verifyDeterministic() {
+        }
+    }
+
+    static class GameCoder extends Coder<Task.Game> {
+        private static final GameCoder INSTANCE = new GameCoder();
+
+        public static GameCoder of() {
+            return INSTANCE;
+        }
+
+        @Override
+        public void encode(Task.Game game, OutputStream outStream) throws IOException {
+            String line = game.userId + "," + game.score + "," + game.gameId + "," + game.date;
+            outStream.write(line.getBytes());
+        }
+
+        @Override
+        public Task.Game decode(InputStream inStream) throws IOException {
+            final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream));
+            String[] params = serializedDTOs.split(",");
+            return new Task.Game(params[0], Integer.valueOf(params[2]), params[3], params[4]);
+        }
+
+        @Override
+        public List<? extends Coder<?>> getCoderArguments() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void verifyDeterministic() {
+        }
+    }
+
+    public static PCollection<User> getUserPCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(100);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractUserFn())).setCoder(UserCoder.of());
+    }
+
+    public static PCollection<Game> getGamePCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(100);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractUserProgressFn())).setCoder(GameCoder.of());
+    }
+
+    static class ExtractUserFn extends DoFn<String, User> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new User(items[0], items[1]));
+        }
+    }
+
+    static class ExtractUserProgressFn extends DoFn<String, Game> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new Game(items[0], Integer.valueOf(items[2]), items[3], items[4]));
+        }
+    }
+
+    static class LogOutput<T> extends DoFn<T, T> {
+
+        private final String prefix;
+
+        LogOutput() {
+            this.prefix = "Processing element";
+        }
+
+        LogOutput(String prefix) {
+            this.prefix = prefix;
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            LOG.info(prefix + ": {}", c.element());
+        }
+    }
+}
diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/schema-based-transforms/join/unit-info.yaml
similarity index 87%
copy from learning/tour-of-beam/learning-content/content-info.yaml
copy to learning/tour-of-beam/learning-content/schema-based-transforms/join/unit-info.yaml
index 27bebdf26d6..2f68f9a0b09 100644
--- a/learning/tour-of-beam/learning-content/content-info.yaml
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/join/unit-info.yaml
@@ -19,11 +19,7 @@
 
 sdk:
   - Java
-  - Python
-  - Go
-content:
-  - introduction
-  - common-transforms
-  - core-transforms
-  - windowing
-  - triggers
\ No newline at end of file
+id: join
+name: Join
+taskName: join
+complexity: ADVANCED
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/schema-based-transforms/module-info.yaml
similarity index 81%
copy from learning/tour-of-beam/learning-content/content-info.yaml
copy to learning/tour-of-beam/learning-content/schema-based-transforms/module-info.yaml
index 27bebdf26d6..ea647d3181e 100644
--- a/learning/tour-of-beam/learning-content/content-info.yaml
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/module-info.yaml
@@ -19,11 +19,18 @@
 
 sdk:
   - Java
-  - Python
-  - Go
+id: schema-based-transforms
+name: Schema-based transforms
+complexity: ADVANCED
 content:
-  - introduction
-  - common-transforms
-  - core-transforms
-  - windowing
-  - triggers
\ No newline at end of file
+  - schema-concept
+  - select
+  - join
+  - group
+  - filter
+  - co-group
+  - convert
+  - rename
+  - coder
+
+
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/description.md b/learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/description.md
new file mode 100644
index 00000000000..23ba099d53b
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/description.md
@@ -0,0 +1,39 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+### Schema Based Transforms motivating challenge
+
+You are provided with a `PCollection` of the array of game statistics of users in a csv file. And a `PCollection` of user data. You need to group them by **summing up all** the points. And output users who have more than **11** points.Don't forget to write a `coder`.
+
+
+`Game PCollection`:
+| userId                   | score | gameId        | date                |
+|--------------------------|-------|---------------|---------------------|
+| user16_AmaranthKoala     | 18    | 1447719060000 | 2015-11-16 16:11:04 |
+| user10_AndroidGreenKoala | 2     | 1447719060000 | 2015-11-16 16:11:04 |
+| user9_AuburnCockatoo     | 5     | 1447719060000 | 2015-11-16 16:11:04 |
+| ...                      | ...   | ...           | ...                 |
+
+
+
+`User PCollection`:
+| userId                   | userName          |
+|--------------------------|-------------------|
+| user16_AmaranthKoala     | AmaranthKoala     |
+| user10_AndroidGreenKoala | AndroidGreenKoala |
+| user9_AuburnCockatoo     | AuburnCockatoo    |
+| ...                      | ...               |
+
+
+Overview [file](https://storage.googleapis.com/apache-beam-samples/game/small/gaming_data.csv)
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/hint1.md b/learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/hint1.md
new file mode 100644
index 00000000000..a563b9e9bbb
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/hint1.md
@@ -0,0 +1,24 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+To solve this challenge, you may build a pipeline that consists of the following steps:
+1. First you need to convert a collection of users to an `Object`: `.apply(MapElements.into(TypeDescriptor.of(Object.class)).via(it -> it))`.
+2. You have to pass the `userSchema` to `setSchema(...)` and write a `TypeDescriptor`. Also pass transformations of `Object to Row` and `Row to Object`.
+3. Make a `Join` with `gameInfo` using `userId` field.
+4. Make conversions to `Object` using `MapElements` which will return `allFieldSchema`.
+5. Write your own coder that accepts `Object`.
+6. Make a `setRowSchema()` that accepts `allFieldSchema`.
+7. `Group` by `userId` field and **sum up all** `points`.
+8. Make conversions to `Object` using `MapElements` which will return `totalSchema`.
+9. Make `setCoder()` with your coder.
+10. Make a `setRowSchema()` that accepts `totalSchema`.
+11. `Filter` by the `total` field where it is **greater than 11**.
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/java-challenge/Task.java b/learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/java-challenge/Task.java
new file mode 100644
index 00000000000..03029525cfe
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/java-challenge/Task.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: SchemaBasedChallenge
+//   description: Schema Based Challenge example.
+//   multifile: false
+//   context_line: 137
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.util.StreamUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class Game {
+        public String userId;
+        public Integer score;
+        public String gameId;
+        public String date;
+
+        @SchemaCreate
+        public Game(String userId, Integer score, String gameId, String date) {
+            this.userId = userId;
+            this.score = score;
+            this.gameId = gameId;
+            this.date = date;
+        }
+
+        @Override
+        public String toString() {
+            return "Game{" +
+                    "userId='" + userId + '\'' +
+                    ", score='" + score + '\'' +
+                    ", gameId='" + gameId + '\'' +
+                    ", date='" + date + '\'' +
+                    '}';
+        }
+    }
+
+    // User schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class User {
+
+        public String userId;
+        public String userName;
+
+        @SchemaCreate
+        public User(String userId, String userName) {
+            this.userId = userId;
+            this.userName = userName;
+        }
+
+        @Override
+        public String toString() {
+            return "User{" +
+                    "userId='" + userId + '\'' +
+                    ", userName='" + userName + '\'' +
+                    '}';
+        }
+    }
+
+
+    public static void main(String[] args) {
+        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        PCollection<User> userInfo = getUserPCollection(pipeline);
+        PCollection<Game> gameInfo = getGamePCollection(pipeline);
+
+        userInfo
+                .apply("User", ParDo.of(new LogOutput<>("Users")));
+
+        gameInfo
+                .apply("Game", ParDo.of(new LogOutput<>("Games")));
+
+        pipeline.run();
+    }
+
+    public static PCollection<User> getUserPCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(10);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractUserFn())).setCoder(UserCoder.of());
+    }
+
+    public static PCollection<Game> getGamePCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(100);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractUserProgressFn())).setCoder(GameCoder.of());
+    }
+
+    static class ExtractUserFn extends DoFn<String, User> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new User(items[0], items[1]));
+        }
+    }
+
+    static class ExtractUserProgressFn extends DoFn<String, Game> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new Game(items[0], Integer.valueOf(items[2]), items[3], items[4]));
+        }
+    }
+
+    static class UserCoder extends Coder<Task.User> {
+        private static final UserCoder INSTANCE = new UserCoder();
+
+        public static UserCoder of() {
+            return INSTANCE;
+        }
+
+        @Override
+        public void encode(Task.User user, OutputStream outStream) throws IOException {
+            String line = user.userId + "," + user.userName;
+            outStream.write(line.getBytes());
+        }
+
+        @Override
+        public Task.User decode(InputStream inStream) throws IOException {
+            final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream));
+            String[] params = serializedDTOs.split(",");
+            return new Task.User(params[0], params[1]);
+        }
+
+        @Override
+        public List<? extends Coder<?>> getCoderArguments() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void verifyDeterministic() {
+        }
+    }
+
+
+    static class GameCoder extends Coder<Task.Game> {
+        private static final GameCoder INSTANCE = new GameCoder();
+
+        public static GameCoder of() {
+            return INSTANCE;
+        }
+
+        @Override
+        public void encode(Task.Game game, OutputStream outStream) throws IOException {
+            String line = game.userId + "," + game.score + "," + game.gameId + "," + game.date;
+            outStream.write(line.getBytes());
+        }
+
+        @Override
+        public Task.Game decode(InputStream inStream) throws IOException {
+            final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream));
+            String[] params = serializedDTOs.split(",");
+            return new Task.Game(params[0], Integer.valueOf(params[1]), params[2], params[3]);
+        }
+
+        @Override
+        public List<? extends Coder<?>> getCoderArguments() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void verifyDeterministic() {
+        }
+    }
+
+
+    static class LogOutput<T> extends DoFn<T, T> {
+
+        private final String prefix;
+
+        LogOutput() {
+            this.prefix = "Processing element";
+        }
+
+        LogOutput(String prefix) {
+            this.prefix = prefix;
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            LOG.info(prefix + ": {}", c.element());
+        }
+    }
+}
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/java-solution/Task.java b/learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/java-solution/Task.java
new file mode 100644
index 00000000000..a3699cd90b2
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/java-solution/Task.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: SchemaBasedSolution
+//   description: Schema Based Solution example.
+//   multifile: false
+//   context_line: 137
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.transforms.Filter;
+import org.apache.beam.sdk.schemas.transforms.Group;
+import org.apache.beam.sdk.schemas.transforms.Join;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.util.StreamUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.io.Serializable;
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class Game {
+        public String userId;
+        public Integer score;
+        public String gameId;
+        public String date;
+
+        @SchemaCreate
+        public Game(String userId, Integer score, String gameId, String date) {
+            this.userId = userId;
+            this.score = score;
+            this.gameId = gameId;
+            this.date = date;
+        }
+
+        @Override
+        public String toString() {
+            return "Game{" +
+                    "userId='" + userId + '\'' +
+                    ", score='" + score + '\'' +
+                    ", gameId='" + gameId + '\'' +
+                    ", date='" + date + '\'' +
+                    '}';
+        }
+    }
+
+    // User schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class User {
+
+        public String userId;
+        public String userName;
+
+        @SchemaCreate
+        public User(String userId, String userName) {
+            this.userId = userId;
+            this.userName = userName;
+        }
+
+        @Override
+        public String toString() {
+            return "User{" +
+                    "userId='" + userId + '\'' +
+                    ", userName='" + userName + '\'' +
+                    '}';
+        }
+    }
+
+
+    public static void main(String[] args) {
+        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        Schema userSchema = Schema.builder()
+                .addStringField("userId")
+                .addStringField("userName")
+                .build();
+
+        Schema gameSchema = Schema.builder()
+                .addStringField("userId")
+                .addInt32Field("score")
+                .addStringField("gameId")
+                .addStringField("date")
+                .build();
+
+        PCollection<User> userInfo = getUserPCollection(pipeline)
+                .setSchema(userSchema, TypeDescriptor.of(User.class), input -> {
+                            User user = input;
+                            return Row.withSchema(userSchema).addValues(user.userId, user.userName).build();
+                        }, input -> new User(input.getString(0), input.getString(1))
+                );
+
+        PCollection<Game> gameInfo = getGamePCollection(pipeline).setSchema(gameSchema, TypeDescriptor.of(Game.class), row -> {
+            Game game = row;
+            return Row.withSchema(gameSchema).addValues(game.userId, game.score, game.gameId, game.date).build();
+        }, row -> new Game(row.getString(0), row.getInt32(1), row.getString(2), row.getString(3)));
+
+        PCollection<Row> pCollection = userInfo
+                .apply(Join.<User, Game>innerJoin(gameInfo).using("userId"))
+                .apply(Group.<Row>byFieldNames("lhs.userId").aggregateField("rhs.score", Sum.ofIntegers(), "total"))
+                .apply(Filter.<Row>create().whereFieldName("value.total", s -> (int) s > 11));
+
+        pCollection
+                .apply("User", ParDo.of(new LogOutput<>("Results")));
+
+        pipeline.run();
+    }
+
+    public static PCollection<User> getUserPCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(100);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractUserFn())).setCoder(UserCoder.of());
+    }
+
+    public static PCollection<Game> getGamePCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(100);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractUserProgressFn())).setCoder(GameCoder.of());
+    }
+
+    static class ExtractUserFn extends DoFn<String, User> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new User(items[0], items[1]));
+        }
+    }
+
+    static class ExtractUserProgressFn extends DoFn<String, Game> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new Game(items[0], Integer.valueOf(items[2]), items[3], items[4]));
+        }
+    }
+
+    static class UserCoder extends Coder<Task.User> {
+        private static final UserCoder INSTANCE = new UserCoder();
+
+        public static UserCoder of() {
+            return INSTANCE;
+        }
+
+        @Override
+        public void encode(Task.User user, OutputStream outStream) throws IOException {
+            String line = user.userId + "," + user.userName;
+            outStream.write(line.getBytes());
+        }
+
+        @Override
+        public Task.User decode(InputStream inStream) throws IOException {
+            final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream));
+            String[] params = serializedDTOs.split(",");
+            return new Task.User(params[0], params[1]);
+        }
+
+        @Override
+        public List<? extends Coder<?>> getCoderArguments() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void verifyDeterministic() {
+        }
+    }
+
+    static class GameCoder extends Coder<Task.Game> {
+        private static final GameCoder INSTANCE = new GameCoder();
+
+        public static GameCoder of() {
+            return INSTANCE;
+        }
+
+        @Override
+        public void encode(Task.Game game, OutputStream outStream) throws IOException {
+            String line = game.userId + "," + game.score + "," + game.gameId + "," + game.date;
+            outStream.write(line.getBytes());
+        }
+
+        @Override
+        public Task.Game decode(InputStream inStream) throws IOException {
+            final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream));
+            String[] params = serializedDTOs.split(",");
+            return new Task.Game(params[0], Integer.valueOf(params[1]), params[2], params[3]);
+        }
+
+        @Override
+        public List<? extends Coder<?>> getCoderArguments() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void verifyDeterministic() {
+        }
+    }
+
+    static class LogOutput<T> extends DoFn<T, T> {
+
+        private final String prefix;
+
+        LogOutput() {
+            this.prefix = "Processing element";
+        }
+
+        LogOutput(String prefix) {
+            this.prefix = prefix;
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            LOG.info(prefix + ": {}", c.element());
+        }
+    }
+}
diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/unit-info.yaml
similarity index 86%
copy from learning/tour-of-beam/learning-content/content-info.yaml
copy to learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/unit-info.yaml
index 27bebdf26d6..a6852a4cf9a 100644
--- a/learning/tour-of-beam/learning-content/content-info.yaml
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/motivating-challenge/unit-info.yaml
@@ -19,11 +19,7 @@
 
 sdk:
   - Java
-  - Python
-  - Go
-content:
-  - introduction
-  - common-transforms
-  - core-transforms
-  - windowing
-  - triggers
\ No newline at end of file
+id: schema-motivating-challenge
+name: Schema Based Challenge
+taskName: SchemaBasedChallenge
+solutionName: SchemaBasedSolution
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/rename/description.md b/learning/tour-of-beam/learning-content/schema-based-transforms/rename/description.md
new file mode 100644
index 00000000000..1957532fa8b
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/rename/description.md
@@ -0,0 +1,36 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+### Renaming schema fields
+
+```RenameFields``` allows specific fields in a schema to be renamed. The field values in input rows are left unchanged, only the schema is modified. This transform is often used to prepare records for output to a schema-aware sink, such as an RDBMS, to make sure that the ```PCollection``` schema field names match that of the output. It can also be used to rename fields generated by other transforms to make them more usable (similar to SELECT AS in SQL). Nested fields can also be renamed  [...]
+
+```
+input.apply(RenameFields.<PurchasePojo>create()
+  .rename("userId", "userIdentifier")
+  .rename("shippingAddress.streetAddress", "shippingAddress.street"));
+```
+
+Results in the same set of unmodified input elements, however the schema on the ```PCollection``` has been changed to rename **userId** to **userIdentifier** and **shippingAddress.streetAddress** to **shippingAddress.street**.
+
+### Playground exercise
+
+In the playground window you can find examples of using the `Rename`. By running this example, you will see user statistics in certain games.
+Since you specified in the `score` `scheme` as equal to the field with `userId`, it can be changed directly:
+```
+.apply(RenameFields.create()
+                        .rename("userId", "id")
+                        .rename("userName", "name")
+                        .rename("score","point"));
+```
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/rename/java-example/Task.java b/learning/tour-of-beam/learning-content/schema-based-transforms/rename/java-example/Task.java
new file mode 100644
index 00000000000..a48ba2b71ca
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/rename/java-example/Task.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: rename
+//   description: Rename example.
+//   multifile: false
+//   context_line: 133
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.transforms.Group;
+import org.apache.beam.sdk.schemas.transforms.RenameFields;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.util.StreamUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.io.Serializable;
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class Game {
+        public String userId;
+        public Integer score;
+        public String gameId;
+        public String date;
+
+        @SchemaCreate
+        public Game(String userId, Integer score, String gameId, String date) {
+            this.userId = userId;
+            this.score = score;
+            this.gameId = gameId;
+            this.date = date;
+        }
+
+        @Override
+        public String toString() {
+            return "Game{" +
+                    "userId='" + userId + '\'' +
+                    ", score='" + score + '\'' +
+                    ", gameId='" + gameId + '\'' +
+                    ", date='" + date + '\'' +
+                    '}';
+        }
+    }
+
+    // User schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class User {
+
+        public String userId;
+        public String userName;
+        public Game game;
+
+        @SchemaCreate
+        public User(String userId, String userName
+                , Game game
+        ) {
+            this.userId = userId;
+            this.userName = userName;
+            this.game = game;
+        }
+
+        @Override
+        public String toString() {
+            return "User{" +
+                    "userId='" + userId + '\'' +
+                    ", userName='" + userName + '\'' +
+                    ", game=" + game +
+                    '}';
+        }
+    }
+
+
+    public static void main(String[] args) {
+        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        PCollection<User> input = getProgressPCollection(pipeline);
+
+        Schema type = Schema.builder()
+                .addStringField("userId")
+                .addStringField("userName")
+                .addInt32Field("score")
+                .addStringField("gameId")
+                .addStringField("date")
+                .build();
+
+        PCollection<Row> pCollection = input
+                .apply(MapElements.into(TypeDescriptor.of(Object.class)).via(it -> it))
+                .setSchema(type,
+                        TypeDescriptor.of(Object.class), row ->
+                        {
+                            User user = (User) row;
+                            return Row.withSchema(type)
+                                    .addValues(user.userId, user.userName, user.game.score, user.game.gameId, user.game.date)
+                                    .build();
+                        },
+                        row -> new User(row.getString(0), row.getString(1),
+                                new Game(row.getString(0), row.getInt32(2), row.getString(3), row.getString(4)))
+                )
+                .apply(RenameFields.create()
+                        .rename("userId", "id")
+                        .rename("userName", "name"));
+
+        pCollection
+                .setCoder(RowCoder.of(type))
+                .apply("User flatten row", ParDo.of(new LogOutput<>("Flattened")));
+
+        pipeline.run();
+    }
+
+    public static PCollection<User> getProgressPCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(100);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractUserProgressFn())).setCoder(CustomCoder.of());
+    }
+
+    static class ExtractUserProgressFn extends DoFn<String, User> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new User(items[0], items[1], new Game(items[0], Integer.valueOf(items[2]), items[3], items[4])
+            ));
+        }
+    }
+
+    static class CustomCoder extends Coder<User> {
+        private static final CustomCoder INSTANCE = new CustomCoder();
+
+        public static CustomCoder of() {
+            return INSTANCE;
+        }
+
+        @Override
+        public void encode(User user, OutputStream outStream) throws IOException {
+            String line = user.userId + "," + user.userName + ";" + user.game.score + "," + user.game.gameId + "," + user.game.date;
+            outStream.write(line.getBytes());
+        }
+
+        @Override
+        public User decode(InputStream inStream) throws IOException {
+            final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream));
+            String[] params = serializedDTOs.split(";");
+            String[] user = params[0].split(",");
+            String[] game = params[1].split(",");
+            return new User(user[0], user[1], new Game(user[0], Integer.valueOf(game[0]), game[1], game[2]));
+        }
+
+        @Override
+        public List<? extends Coder<?>> getCoderArguments() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void verifyDeterministic() {
+        }
+    }
+
+    static class LogOutput<T> extends DoFn<T, T> {
+
+        private final String prefix;
+
+        LogOutput() {
+            this.prefix = "Processing element";
+        }
+
+        LogOutput(String prefix) {
+            this.prefix = prefix;
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            LOG.info(prefix + ": {}", c.element());
+        }
+    }
+}
diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/schema-based-transforms/rename/unit-info.yaml
similarity index 87%
copy from learning/tour-of-beam/learning-content/content-info.yaml
copy to learning/tour-of-beam/learning-content/schema-based-transforms/rename/unit-info.yaml
index 27bebdf26d6..7f384d652b6 100644
--- a/learning/tour-of-beam/learning-content/content-info.yaml
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/rename/unit-info.yaml
@@ -19,11 +19,7 @@
 
 sdk:
   - Java
-  - Python
-  - Go
-content:
-  - introduction
-  - common-transforms
-  - core-transforms
-  - windowing
-  - triggers
\ No newline at end of file
+id: rename
+name: Rename
+taskName: rename
+complexity: ADVANCED
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/creating-schema/description.md b/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/creating-schema/description.md
new file mode 100644
index 00000000000..6560a88d51e
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/creating-schema/description.md
@@ -0,0 +1,197 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Overview
+
+Most structured records share some common characteristics:
+
+* They can be subdivided into separate named fields. Fields usually have string names, but sometimes - as in the case of indexed tuples - have numerical indices instead.
+
+* There is a confined list of primitive types that a field can have. These often match primitive types in most programming languages: int, long, string, etc.
+
+* Often a field type can be marked as optional (sometimes referred to as nullable) or required.
+
+Often records have a nested structure. A nested structure occurs when a field itself has subfields, so the type of the field itself has a schema. Fields that are array or map types are also a common feature of these structured records.
+
+For example, consider the following schema, representing actions in a fictitious e-commerce company:
+
+**Purchase**
+
+```
+Field Name              Field Type
+userId                  STRING
+itemId                  INT64
+shippingAddress         ROW(ShippingAddress)
+cost                    INT64
+transactions            ARRAY[ROW(Transaction)]
+```
+
+**ShippingAddress**
+
+```
+Field Name              Field Type
+streetAddress           STRING
+city                    STRING
+state                   nullable STRING
+country                 STRING
+postCode                STRING
+Purchase                ROW(Purchase)
+```
+
+**Transaction**
+
+```
+Field Name              Field Type
+bank                    STRING
+purchase                ROW(Purchase)
+```
+
+Schemas provide us with a type system for Beam records that is independent of any specific programming-language type. There might be multiple types of Java objects that all have the same schema. For example, you can implement the same schema as Protocol-Buffer or POJO class.
+
+Schemas also provide a simple way to reason about types across different programming-language APIs.
+
+A `PCollection` with a schema does not need to have a `Coder` specified, as Beam knows how to encode and decode Schema rows; Beam uses a special coder to encode schema types.
+
+### Creating Schemas
+
+While schemas themselves are language independent, they are designed to embed naturally into the programming languages of the Beam SDK being used. This allows Beam users to continue using native types while reaping the advantage of having Beam understand their element schemas.
+
+In Java you could use the following set of classes to represent the purchase schema. Beam will automatically infer the correct schema based on the members of the class.
+
+#### Java POJOs
+
+A `POJO` (Plain Old Java Object) is a Java object that is not bound by any restriction other than the Java Language Specification. A `POJO` can contain member variables that are primitives, that are other POJOs, or are collections maps or arrays thereof. `POJO`s do not have to extend prespecified classes or extend any specific interfaces.
+
+If a `POJO` class is annotated with `@DefaultSchema(JavaFieldSchema.class)`, Beam will automatically infer a schema for this class. Nested classes are supported as are classes with List, array, and Map fields.
+
+For example, annotating the following class tells Beam to infer a schema from this `POJO` class and apply it to any `PCollection<TransactionPojo>`.
+
+```
+@DefaultSchema(JavaFieldSchema.class)
+public class TransactionPojo {
+  public final String bank;
+  public final double purchaseAmount;
+  @SchemaCreate
+  public TransactionPojo(String bank, double purchaseAmount) {
+    this.bank = bank;
+    this.purchaseAmount = purchaseAmount;
+  }
+}
+// Beam will automatically infer the correct schema for this PCollection. No coder is needed as a result.
+PCollection<TransactionPojo> pojos = readPojos();
+```
+The `@SchemaCreate` annotation indicates to Beam that instances of the `TransactionPojo` class can be created using the annotated constructor, as long as the constructor parameters have the same names as the field names. Additionally, this annotation can also be applied to static factory methods on the class, even if the constructor is private. In the absence of the `@SchemaCreate` annotation, all fields must be non-final and the class must have a zero-argument constructor.
+
+A couple of other useful annotations affect how Beam infers schemas. By default, the schema field names will match that of the class field names. However, `@SchemaFieldName` can be used to specify a different name to be used for the schema field.
+
+You can use `@SchemaIgnore` to mark specific class fields as excluded from the inferred schema. For example, it’s common to have ephemeral fields in a class that should not be included in a schema (e.g., caching the hash value to prevent expensive recomputation of the hash), and `@SchemaIgnore` allows to exclude such fields. Note that ignored fields will be excluded from encoding as well.
+
+In some cases it is not convenient to annotate the POJO class, for example if the POJO is in a different package that is not owned by the Beam pipeline author. In these cases the schema inference can be triggered programmatically in pipeline’s main function as follows:
+
+```
+pipeline.getSchemaRegistry().registerPOJO(TransactionPOJO.class);
+```
+
+#### Java Beans
+
+Java Beans are a de-facto standard for creating reusable property classes in Java. While the full standard has many characteristics, the key ones are that all fields must be accessed using getters and setters, and the name format for these getters and setters is standardized. A Java Bean class can be annotated with `@DefaultSchema(JavaBeanSchema.class)`, and Beam will automatically infer a schema for this class.
+
+Similarly to POJO classes, you can use `@SchemaCreate` annotation to specify a constructor or a static factory method. Otherwise, Beam will use zero arguments constructor and setters to instantiate the class.
+
+The `@SchemaCreate` annotation can be used to specify a constructor or a static factory method, in which case the setters and zero-argument constructor can be omitted.
+
+```
+@DefaultSchema(JavaBeanSchema.class)
+public class Purchase {
+  public String getUserId();  // Returns the id of the user who made the purchase.
+  public long getItemId();  // Returns the identifier of the item that was purchased.
+  public ShippingAddress getShippingAddress();  // Returns the shipping address, a nested type.
+  public long getCostCents();  // Returns the cost of the item.
+  public List<Transaction> getTransactions();  // Returns the transactions that paid for this purchase (returns a list, since the purchase might be spread out over multiple credit cards).
+
+  @SchemaCreate
+  public Purchase(String userId, long itemId, ShippingAddress shippingAddress, long costCents, List<Transaction> transactions) {
+      ...
+  }
+}
+```
+
+`@SchemaFieldName` and `@SchemaIgnore` can be used to alter the schema inferred, just like with `POJO` classes.
+
+#### AutoValue
+
+Java value classes are notoriously difficult to generate correctly. This is because there are a lot of boilerplates you must create to implement a value class properly. AutoValue is a popular library to simplify simple class creation.
+Beam can infer a schema from an `AutoValue` class. For example:
+
+```
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class ShippingAddress {
+  public abstract String streetAddress();
+  public abstract String city();
+  public abstract String state();
+  public abstract String country();
+  public abstract String postCode();
+}
+```
+
+This is all that’s needed to generate a simple `AutoValue` class, and the above `@DefaultSchema` annotation tells Beam to infer a schema from it. This also allows `AutoValue` elements to be used inside of `PCollections`.
+
+You can also use `@SchemaFieldName` and `@SchemaIgnore` annotations to specify different schema field names or ignore fields, respectively.
+
+### Playground exercise
+
+In the playground window you can find examples of creating schemes. By running this example, you will see user statistics in certain games.
+For using `@DefaultSchema(JavaBeanSchema.class)` need getter and setter:
+```
+@DefaultSchema(JavaBeanSchema.class)
+public static class Game {
+        public String userId;
+        public String score;
+        public String gameId;
+        public String date;
+
+        public String getUserId() {
+            return userId;
+        }
+
+        public void setUserId(String userId) {
+            this.userId = userId;
+        }
+
+        public String getScore() {
+            return score;
+        }
+
+        public void setScore(String score) {
+            this.score = score;
+        }
+
+        public String getGameId() {
+            return gameId;
+        }
+
+        public void setGameId(String gameId) {
+            this.gameId = gameId;
+        }
+
+        public String getDate() {
+            return date;
+        }
+
+        public void setDate(String date) {
+            this.date = date;
+        }
+}
+```
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/creating-schema/java-example/Task.java b/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/creating-schema/java-example/Task.java
new file mode 100644
index 00000000000..0ef8f0720fe
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/creating-schema/java-example/Task.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: creating-schema
+//   description: Creating schema example.
+//   multifile: false
+//   context_line: 124
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.util.StreamUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.io.Serializable;
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class Game {
+        public String userId;
+        public Integer score;
+        public String gameId;
+        public String date;
+
+        @SchemaCreate
+        public Game(String userId, Integer score, String gameId, String date) {
+            this.userId = userId;
+            this.score = score;
+            this.gameId = gameId;
+            this.date = date;
+        }
+
+        @Override
+        public String toString() {
+            return "Game{" +
+                    "userId='" + userId + '\'' +
+                    ", score='" + score + '\'' +
+                    ", gameId='" + gameId + '\'' +
+                    ", date='" + date + '\'' +
+                    '}';
+        }
+    }
+
+    // User schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class User {
+        public String userId;
+        public String userName;
+
+        public Game game;
+
+        @SchemaCreate
+        public User(String userId, String userName, Game game) {
+            this.userId = userId;
+            this.userName = userName;
+            this.game = game;
+        }
+
+        @Override
+        public String toString() {
+            return "User{" +
+                    "userId='" + userId + '\'' +
+                    ", userName='" + userName + '\'' +
+                    ", game=" + game +
+                    '}';
+        }
+    }
+
+    public static void main(String[] args) {
+        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        PCollection<User> fullStatistics = getProgressPCollection(pipeline);
+
+        fullStatistics
+                .setCoder(CustomCoder.of())
+                .apply("User", ParDo.of(new LogOutput<>("User statistics")));
+
+        pipeline.run();
+    }
+
+    public static PCollection<User> getProgressPCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(100);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractUserProgressFn()));
+    }
+
+    static class ExtractUserProgressFn extends DoFn<String, User> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new User(items[0], items[1], new Game(items[0], Integer.valueOf(items[2]), items[3], items[4])));
+        }
+    }
+
+    static class CustomCoder extends Coder<User> {
+        private static final CustomCoder INSTANCE = new CustomCoder();
+
+        public static CustomCoder of() {
+            return INSTANCE;
+        }
+
+        @Override
+        public void encode(User user, OutputStream outStream) throws IOException {
+            String line = user.userId + "," + user.userName + ";" + user.game.score + "," + user.game.gameId + "," + user.game.date;
+            outStream.write(line.getBytes());
+        }
+
+        @Override
+        public User decode(InputStream inStream) throws IOException {
+            final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream));
+            String[] params = serializedDTOs.split(";");
+            String[] user = params[0].split(",");
+            String[] game = params[1].split(",");
+            return new User(user[0], user[1], new Game(user[0], Integer.valueOf(game[0]), game[1], game[2]));
+        }
+
+        @Override
+        public List<? extends Coder<?>> getCoderArguments() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void verifyDeterministic() {
+        }
+    }
+    static class LogOutput<T> extends DoFn<T, T> {
+
+        private final String prefix;
+
+        LogOutput() {
+            this.prefix = "Processing element";
+        }
+
+        LogOutput(String prefix) {
+            this.prefix = prefix;
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            LOG.info(prefix + ": {}", c.element());
+        }
+    }
+}
diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/creating-schema/unit-info.yaml
similarity index 87%
copy from learning/tour-of-beam/learning-content/content-info.yaml
copy to learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/creating-schema/unit-info.yaml
index 27bebdf26d6..ea0deec7508 100644
--- a/learning/tour-of-beam/learning-content/content-info.yaml
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/creating-schema/unit-info.yaml
@@ -19,11 +19,7 @@
 
 sdk:
   - Java
-  - Python
-  - Go
-content:
-  - introduction
-  - common-transforms
-  - core-transforms
-  - windowing
-  - triggers
\ No newline at end of file
+id: creating-schema
+name: Creating schema
+taskName: creating-schema
+complexity: ADVANCED
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/group-info.yaml
similarity index 88%
copy from learning/tour-of-beam/learning-content/content-info.yaml
copy to learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/group-info.yaml
index 27bebdf26d6..c1e26907732 100644
--- a/learning/tour-of-beam/learning-content/content-info.yaml
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/group-info.yaml
@@ -19,11 +19,10 @@
 
 sdk:
   - Java
-  - Python
-  - Go
+id: schema-concepts
+name: Schema Concepts
 content:
-  - introduction
-  - common-transforms
-  - core-transforms
-  - windowing
-  - triggers
\ No newline at end of file
+- creating-schema
+- logical-type
+
+
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/logical-type/description.md b/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/logical-type/description.md
new file mode 100644
index 00000000000..5ebc56a25b7
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/logical-type/description.md
@@ -0,0 +1,104 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+# Logical types
+
+There may be cases when you need to extend the schema type system to add custom logical types. A unique identifier and an argument identify a logical type. Apart from defining the underlying schema type for storage, you also need to implement to and from type conversions. For example, you can represent the union logical type as a row with nullable fields, with only one field set at a time.
+
+In Java, you need to subclass from LogicalType class to implement the logical type. In addition, you will also need to implement to and from underlying Schema type conversions by overriding toBaseTpe and toInputType methods, respectively.
+
+For example, the logical type representing nanosecond timestamp might be implemented as follows:
+
+```
+// A Logical type using java.time.Instant to represent the logical type.
+public class TimestampNanos implements LogicalType<Instant, Row> {
+  // The underlying schema used to represent rows.
+  private final Schema SCHEMA = Schema.builder().addInt64Field("seconds").addInt32Field("nanos").build();
+  @Override public String getIdentifier() { return "timestampNanos"; }
+  @Override public FieldType getBaseType() { return schema; }
+
+  // Convert the representation type to the underlying Row type. Called by Beam when necessary.
+  @Override public Row toBaseType(Instant instant) {
+    return Row.withSchema(schema).addValues(instant.getEpochSecond(), instant.getNano()).build();
+  }
+
+  // Convert the underlying Row type to an Instant. Called by Beam when necessary.
+  @Override public Instant toInputType(Row base) {
+    return Instant.of(row.getInt64("seconds"), row.getInt32("nanos"));
+  }
+
+     ...
+}
+```
+
+### EnumerationType
+
+This logical type allows creating an enumeration type consisting of a set of named constants.
+
+```
+Schema schema = Schema.builder()
+               …
+     .addLogicalTypeField("color", EnumerationType.create("RED", "GREEN", "BLUE"))
+     .build();
+```
+
+The value of this field is stored in the row as an `INT32` type, however the logical type defines a value type that lets you access the enumeration either as a string or a value. For example:
+
+```
+EnumerationType.Value enumValue = enumType.valueOf("RED");
+enumValue.getValue();  // Returns 0, the integer value of the constant.
+enumValue.toString();  // Returns "RED", the string value of the constant
+```
+
+Given a row object with an enumeration field, you can also extract the field as the enumeration value.
+
+```
+EnumerationType.Value enumValue = row.getLogicalTypeValue("color", EnumerationType.Value.class);
+```
+
+### OneOfType
+
+OneOfType allows creating a disjoint union type over a set of schema fields. For example:
+
+```
+Schema schema = Schema.builder()
+               …
+     .addLogicalTypeField("oneOfField",
+        OneOfType.create(Field.of("intField", FieldType.INT32),
+                         Field.of("stringField", FieldType.STRING),
+                         Field.of("bytesField", FieldType.BYTES)))
+      .build();
+```
+
+The value of this field is stored in the row as another `Row` type, where all the fields are marked as nullable. The logical type however defines a `Value` object that contains an enumeration value indicating which field was set and allows getting just that field:
+
+```
+// Returns an enumeration indicating all possible case values for the enum.
+// For the above example, this will be
+// EnumerationType.create("intField", "stringField", "bytesField");
+EnumerationType oneOfEnum = onOfType.getCaseEnumType();
+
+// Creates an instance of the union with the string field set.
+OneOfType.Value oneOfValue = oneOfType.createValue("stringField", "foobar");
+
+// Handle the oneof
+switch (oneOfValue.getCaseEnumType().toString()) {
+  case "intField":
+    return processInt(oneOfValue.getValue(Integer.class));
+  case "stringField":
+    return processString(oneOfValue.getValue(String.class));
+  case "bytesField":
+    return processBytes(oneOfValue.getValue(bytes[].class));
+}
+```
+
diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/logical-type/unit-info.yaml
similarity index 87%
copy from learning/tour-of-beam/learning-content/content-info.yaml
copy to learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/logical-type/unit-info.yaml
index 27bebdf26d6..5cbdc8d9e8f 100644
--- a/learning/tour-of-beam/learning-content/content-info.yaml
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/schema-concept/logical-type/unit-info.yaml
@@ -19,11 +19,7 @@
 
 sdk:
   - Java
-  - Python
-  - Go
-content:
-  - introduction
-  - common-transforms
-  - core-transforms
-  - windowing
-  - triggers
\ No newline at end of file
+id: logical-type
+name: Logical type
+taskName: logical-type
+complexity: ADVANCED
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/select/description.md b/learning/tour-of-beam/learning-content/schema-based-transforms/select/description.md
new file mode 100644
index 00000000000..34af700de52
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/select/description.md
@@ -0,0 +1,125 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Select
+
+The `Select` transform allows one to easily project out only the fields of interest. The resulting `PCollection` has a schema containing each selected field as a top-level field. You can choose both top-level and nested fields.
+
+The output of this transform is of type Row, which you can convert into any other type with matching schema using the `Convert` transform.
+
+### Top-level fields
+
+To select a field at the top level of a schema, you need to specify their names. For example, using the following code, you can choose just user ids from a `PCollection` of purchases:
+
+```
+PCollection<Row> rows = input.apply(Select.fieldNames("userId", "shippingAddress.postCode"));
+```
+
+Will result in the following schema:
+
+```
+Field Name       Field Type
+userId           STRING
+```
+
+### Nested fields
+
+Individual nested fields can be specified using the dot operator. For example, you can select just the postal code from the shipping address using the following:
+
+```
+PCollection<Row> rows = input.apply(Select.fieldNames("shippingAddress.userId","shippingAddress.postCode","shippingAddress.email"));
+```
+
+Will result in the following schema:
+
+```
+Field Name       Field Type
+userId           INT64
+postCode         STRING
+email         STRING
+```
+
+### Wildcards
+
+The `*` operator can be specified at any nesting level to represent all fields at that level. For example, to select all shipping-address fields one would write.
+
+The same is true for wildcard selections. The following:
+
+```
+PCollection<Row> rows = input.apply(Select.fieldNames("shippingAddress.*"));
+```
+
+Will result in the following schema:
+
+```
+Field Name         Field Type
+streetAddress      STRING
+city               STRING
+state              nullable STRING
+country            STRING
+postCode           STRING
+
+```
+
+### Select array
+
+When selecting fields nested inside of an array, the same rule applies that each selected field appears separately as a top-level field in the resulting row. This means that if multiple fields are selected from the same nested row, each selected field will appear as its own array field.
+
+```
+PCollection<Row> rows = input.apply(Select.fieldNames( "transactions.bank", "transactions.purchaseAmount"));
+```
+
+Will result in the following schema:
+
+```
+Field Name        Field Type
+bank              ARRAY[STRING]
+purchaseAmount    ARRAY[DOUBLE]
+```
+
+### Flatten schema
+
+Another use of the `Select` transform is to flatten a nested schema into a single flat schema.
+
+```
+PCollection<Row> rows = input.apply(Select.flattenedSchema());
+```
+
+Will result in the following schema:
+
+```
+Field Name                          Field Type
+userId                              STRING
+itemId                              STRING
+shippingAddress_streetAddress       STRING
+shippingAddress_city                nullable STRING
+shippingAddress_state               STRING
+shippingAddress_country             STRING
+shippingAddress_postCode            STRING
+costCents                           INT64
+transactions_bank                   ARRAY[STRING]
+transactions_purchaseAmount         ARRAY[DOUBLE]
+
+```
+
+### Playground exercise
+
+In the playground window you can find examples of using `Select`.
+
+You can output a field of the first level. And nested at the same time
+
+```
+PCollection<Row> game = input.apply(Select.fieldNames("userName","game.*"));
+game.apply("User game", ParDo.of(new LogOutput<>("Game")));
+```
\ No newline at end of file
diff --git a/learning/tour-of-beam/learning-content/schema-based-transforms/select/java-example/Task.java b/learning/tour-of-beam/learning-content/schema-based-transforms/select/java-example/Task.java
new file mode 100644
index 00000000000..a4438d75d5c
--- /dev/null
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/select/java-example/Task.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: select
+//   description: Select example.
+//   multifile: false
+//   context_line: 126
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.transforms.Select;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class Game {
+        public String userId;
+        public String score;
+        public String gameId;
+        public String date;
+
+        @SchemaCreate
+        public Game(String userId, String score, String gameId, String date) {
+            this.userId = userId;
+            this.score = score;
+            this.gameId = gameId;
+            this.date = date;
+        }
+
+        @Override
+        public String toString() {
+            return "Game{" +
+                    "userId='" + userId + '\'' +
+                    ", score='" + score + '\'' +
+                    ", gameId='" + gameId + '\'' +
+                    ", date='" + date + '\'' +
+                    '}';
+        }
+    }
+
+    // User schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class User {
+        public String userId;
+        public String userName;
+
+        public Game game;
+
+        @SchemaCreate
+        public User(String userId, String userName, Game game) {
+            this.userId = userId;
+            this.userName = userName;
+            this.game = game;
+        }
+
+        @Override
+        public String toString() {
+            return "User{" +
+                    "userId='" + userId + '\'' +
+                    ", userName='" + userName + '\'' +
+                    ", game=" + game +
+                    '}';
+        }
+    }
+
+    public static void main(String[] args) {
+        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        Schema shortInfoSchema = Schema.builder()
+                .addStringField("userId")
+                .addStringField("gameId")
+                .build();
+
+        Schema gameSchema = Schema.builder()
+                .addStringField("userId")
+                .addStringField("score")
+                .addStringField("gameId")
+                .addStringField("date")
+                .build();
+
+        Schema dataSchema = Schema.builder()
+                .addStringField("userId")
+                .addStringField("userName")
+                .addRowField("game", gameSchema)
+                .build();
+
+
+        PCollection<User> input = getProgressPCollection(pipeline)
+                .setSchema(dataSchema,
+                        TypeDescriptor.of(User.class), row ->
+                        {
+                            User user = row;
+                            Game game = user.game;
+
+                            Row gameRow = Row.withSchema(gameSchema)
+                                    .addValues(game.userId, game.score, game.gameId, game.date)
+                                    .build();
+
+                            return Row.withSchema(dataSchema)
+                                    .addValues(user.userId, user.userName, gameRow).build();
+                        },
+                        row -> {
+                            String userId = row.getValue("userId");
+                            String userName = row.getValue("userName");
+                            Row game = row.getValue("game");
+
+                            String gameId = game.getValue("gameId");
+                            String gameScore = game.getValue("score");
+                            String gameDate = game.getValue("date");
+                            return new User(userId,userName,
+                                    new Game(userId,gameScore,gameId,gameDate));
+                        });
+
+        // Select [userId] and [userName]
+        PCollection<Row> shortInfo = input
+                .apply(Select.<User>fieldNames("userId", "userName").withOutputSchema(shortInfoSchema))
+                .apply("User short info", ParDo.of(new LogOutput<>("Short Info")));
+
+        // Select user [game]
+        PCollection<Row> game = input
+                .apply(Select.fieldNames("game.*"))
+                .apply("User game", ParDo.of(new LogOutput<>("Game")));
+
+        // Flattened row, select all fields
+        PCollection<Row> flattened = input
+                .apply(Select.flattenedSchema())
+                .apply("User flatten row", ParDo.of(new LogOutput<>("Flattened")));
+
+
+        pipeline.run();
+    }
+
+    public static PCollection<User> getProgressPCollection(Pipeline pipeline) {
+        PCollection<String> rides = pipeline.apply(TextIO.read().from("gs://apache-beam-samples/game/small/gaming_data.csv"));
+        final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample = Sample.fixedSizeGlobally(100);
+        return rides.apply(sample).apply(Flatten.iterables()).apply(ParDo.of(new ExtractUserProgressFn()));
+    }
+
+    static class ExtractUserProgressFn extends DoFn<String, User> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            String[] items = c.element().split(",");
+            c.output(new User(items[0], items[1], new Game(items[0], items[2], items[3], items[4])));
+        }
+    }
+
+    static class LogOutput<T> extends DoFn<T, T> {
+
+        private final String prefix;
+
+        LogOutput() {
+            this.prefix = "Processing element";
+        }
+
+        LogOutput(String prefix) {
+            this.prefix = prefix;
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            LOG.info(prefix + ": {}", c.element());
+        }
+    }
+}
diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/schema-based-transforms/select/unit-info.yaml
similarity index 87%
copy from learning/tour-of-beam/learning-content/content-info.yaml
copy to learning/tour-of-beam/learning-content/schema-based-transforms/select/unit-info.yaml
index 27bebdf26d6..50f503f12cd 100644
--- a/learning/tour-of-beam/learning-content/content-info.yaml
+++ b/learning/tour-of-beam/learning-content/schema-based-transforms/select/unit-info.yaml
@@ -19,11 +19,7 @@
 
 sdk:
   - Java
-  - Python
-  - Go
-content:
-  - introduction
-  - common-transforms
-  - core-transforms
-  - windowing
-  - triggers
\ No newline at end of file
+id: select
+name: Select
+taskName: select
+complexity: ADVANCED
\ No newline at end of file