You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/29 04:27:39 UTC

[GitHub] [beam] rezarokni commented on a change in pull request #12097: [BEAM-10327] Create a pattern that shows use of Schema using Joins

rezarokni commented on a change in pull request #12097:
URL: https://github.com/apache/beam/pull/12097#discussion_r446763798



##########
File path: website/www/site/content/en/documentation/patterns/schema.md
##########
@@ -0,0 +1,56 @@
+---
+title: "Schema Patterns"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Schema Patterns
+
+The samples on this page describe common patterns using Schemas. 
+A Schema is a way to represent records with a fixed structure. Schemas are useful because Beam sources commonly produce JSON, Avro or database row objects all of which have a well-defined structure. 

Review comment:
       Maybe just borrow the text from:
   https://beam.apache.org/documentation/programming-guide/#what-is-a-schema
   
   Schemas provide us a type-system for Beam records that is independent of any specific programming-language type. There might be multiple Java classes that all have the same schema (for example a Protocol-Buffer class or a POJO class), and Beam will allow us to seamlessly convert between these types. Schemas also provide a simple way to reason about types across different programming-language APIs.

##########
File path: examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java
##########
@@ -154,6 +156,73 @@ public void testCoGroupByKeyTuple() throws IOException {
     p.run();
   }
 
+  /* Tests SchemaJoinPattern */
+  @Test
+  public void testSchemaJoinPattern() {
+    // [START SchemaJoinPatternCreate]
+    // Define Schemas
+    Schema emailSchema =
+        Schema.of(
+            Schema.Field.of("name", Schema.FieldType.STRING),
+            Schema.Field.of("email", Schema.FieldType.STRING));
+
+    Schema phoneSchema =
+        Schema.of(
+            Schema.Field.of("name", Schema.FieldType.STRING),
+            Schema.Field.of("phone", Schema.FieldType.STRING));
+
+    // Create User Data Collections
+    final List<Row> emailUsers =
+        Arrays.asList(
+            Row.withSchema(emailSchema).addValue("person1").addValue("person1@example.com").build(),
+            Row.withSchema(emailSchema).addValue("person2").addValue("person2@example.com").build(),
+            Row.withSchema(emailSchema).addValue("person3").addValue("person3@example.com").build(),
+            Row.withSchema(emailSchema)
+                .addValue("person4")
+                .addValue("person4@example.com")
+                .build());
+
+    final List<Row> phoneUsers =
+        Arrays.asList(
+            Row.withSchema(phoneSchema).addValue("person1").addValue("111-222-3333").build(),
+            Row.withSchema(phoneSchema).addValue("person2").addValue("222-333-4444").build(),
+            Row.withSchema(phoneSchema).addValue("person3").addValue("444-333-4444").build(),
+            Row.withSchema(phoneSchema).addValue("person4").addValue("555-333-4444").build());
+
+    // [END SchemaJoinPatternCreate]
+
+    PCollection<String> actualFormattedResult =
+        Snippets.SchemaJoinPattern.main(p, emailUsers, phoneUsers, emailSchema, phoneSchema);

Review comment:
       As this is example snippet code, consider if it would be easier for the reader to have the code be inlined here rather than be abstracted in a class. 

##########
File path: examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java
##########
@@ -154,6 +156,73 @@ public void testCoGroupByKeyTuple() throws IOException {
     p.run();
   }
 
+  /* Tests SchemaJoinPattern */
+  @Test
+  public void testSchemaJoinPattern() {
+    // [START SchemaJoinPatternCreate]
+    // Define Schemas
+    Schema emailSchema =
+        Schema.of(
+            Schema.Field.of("name", Schema.FieldType.STRING),
+            Schema.Field.of("email", Schema.FieldType.STRING));
+
+    Schema phoneSchema =
+        Schema.of(
+            Schema.Field.of("name", Schema.FieldType.STRING),
+            Schema.Field.of("phone", Schema.FieldType.STRING));
+
+    // Create User Data Collections
+    final List<Row> emailUsers =

Review comment:
       Consider if adding a non-match would help the sample. For example a missing id from emailUsers or the other. So that the reader is clear on the output when no match occurs.

##########
File path: examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
##########
@@ -914,4 +917,71 @@ public void process(ProcessContext c) {
       return result;
     }
   }
+
+  public static class SchemaJoinPattern {
+    public static PCollection<String> main(
+        Pipeline p,
+        final List<Row> emailUsers,
+        final List<Row> phoneUsers,
+        Schema emailSchema,
+        Schema phoneSchema) {
+      // [START SchemaJoinPatternJoin]
+      // Create/Read Schema PCollections
+      PCollection<Row> emailList =
+          p.apply("CreateEmails", Create.of(emailUsers).withRowSchema(emailSchema));
+
+      PCollection<Row> phoneList =
+          p.apply("CreatePhones", Create.of(phoneUsers).withRowSchema(phoneSchema));
+
+      // Perform Join
+      PCollection<Row> resultRow =
+          emailList.apply("Apply Join", Join.<Row, Row>innerJoin(phoneList).using("name"));
+
+      // Preview Result
+      resultRow.apply(
+          "Preview Result",
+          MapElements.into(TypeDescriptors.strings())
+              .via(
+                  x -> {
+                    System.out.println(x);
+                    return "";
+                  }));
+
+      /* Sample Output From the pipeline:

Review comment:
       As this class is abstracted, the sample output can be difficult to tie back as the data is not in this snippet but later on. Consider if inline will be easier to follow for the example.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org