You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/16 17:07:59 UTC

[GitHub] [beam] fernando-wizeline commented on a diff in pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

fernando-wizeline commented on code in PR #17015:
URL: https://github.com/apache/beam/pull/17015#discussion_r1024278681


##########
examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java:
##########
@@ -114,31 +115,14 @@ private static Map<String, FieldInfo<KV<String, Integer>>> configureCompleteWind
     return tableConfigure;
   }
 
-  public static void main(String[] args) throws Exception {
+  public static void applyStatefulTeamScore(Pipeline p, Options options) throws IOException {
 
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    // Enforce that this pipeline is always run in streaming mode.
-    options.setStreaming(true);
-    ExampleUtils exampleUtils = new ExampleUtils(options);
-    Pipeline pipeline = Pipeline.create(options);
+    PubsubIO.Read<String> records = readRecordsFromPubSub(options);
 
-    pipeline
-        // Read game events from Pub/Sub using custom timestamps, which are extracted from the
-        // pubsub data elements, and parse the data.
-        .apply(
-            PubsubIO.readStrings()
-                .withTimestampAttribute(GameConstants.TIMESTAMP_ATTRIBUTE)
-                .fromTopic(options.getTopic()))
-        .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
-        // Create <team, GameActionInfo> mapping. UpdateTeamScore uses team name as key.
-        .apply(
-            "MapTeamAsKey",
-            MapElements.into(
-                    TypeDescriptors.kvs(
-                        TypeDescriptors.strings(), TypeDescriptor.of(GameActionInfo.class)))
-                .via((GameActionInfo gInfo) -> KV.of(gInfo.team, gInfo)))
-        // Outputs a team's score every time it passes a new multiple of the threshold.
-        .apply("UpdateTeamScore", ParDo.of(new UpdateTeamScoreFn(options.getThresholdScore())))
+    p.apply(records)
+        // Create <team, GameActionInfo> mapping & Outputs a team's score every time it passes a new
+        // multiple of the threshold
+        .apply(new TeamScore(options))
         // Write the results to BigQuery.
         .apply(

Review Comment:
   Hi Kiley!
   I can add it to the transform; the reason I wanted to leave it as a separate step was to show the three different steps of the pipeline, the E, the T and the L.
   Let me know if it makes more sense to add the BigQuery step to the TeamScore transform.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org