You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/16 16:19:23 UTC

[GitHub] [beam] kennknowles commented on a diff in pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

kennknowles commented on code in PR #17015:
URL: https://github.com/apache/beam/pull/17015#discussion_r899274918


##########
examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java:
##########
@@ -204,8 +213,8 @@ public static void main(String[] args) throws Exception {
         pipeline
             .apply(
                 PubsubIO.readStrings()
-                    .withTimestampAttribute(GameConstants.TIMESTAMP_ATTRIBUTE)
-                    .fromTopic(options.getTopic()))
+                    .fromSubscription(options.getSubscription())

Review Comment:
   Same here, it should be conditional on whether `options.getSubscription()` is provided by the user, and otherwise use a topic.



##########
examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java:
##########
@@ -346,9 +350,6 @@ public void processElement(ProcessContext c) {
                 configureSessionWindowWrite()));
     // [END DocInclude_Rewindow]
 
-    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
-    // command line.
-    PipelineResult result = pipeline.run();
-    exampleUtils.waitToFinish(result);
+    pipeline.run(options);

Review Comment:
   This really changes how a user interacts with the example.



##########
examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java:
##########
@@ -242,13 +242,20 @@ public static void main(String[] args) throws Exception {
     ExampleUtils exampleUtils = new ExampleUtils(options);
     Pipeline pipeline = Pipeline.create(options);
 
+    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
+    // command line.
+    PipelineResult result = runGameStats(options, pipeline);
+    exampleUtils.waitToFinish(result);
+  }
+
+  static PipelineResult runGameStats(Options options, Pipeline pipeline) {
     // Read Events from Pub/Sub using custom timestamps
     PCollection<GameActionInfo> rawEvents =
         pipeline
             .apply(
                 PubsubIO.readStrings()
-                    .withTimestampAttribute(GameConstants.TIMESTAMP_ATTRIBUTE)
-                    .fromTopic(options.getTopic()))
+                    .fromSubscription(options.getSubscription())

Review Comment:
   But as an easy-to-use example, it should work with reading from the topic also. Can you please make it so that it can do one or the other. Reading from a subscription is more recommended for production, and good for ITs. But it is fine for development to read from a topic and will let the example be easier to try and play with.



##########
examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java:
##########
@@ -115,9 +114,14 @@ private static Map<String, FieldInfo<KV<String, Integer>>> configureCompleteWind
   public static void main(String[] args) throws Exception {
 
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
     // Enforce that this pipeline is always run in streaming mode.
     options.setStreaming(true);
-    ExampleUtils exampleUtils = new ExampleUtils(options);
+    runStatefulTeamScore(options);

Review Comment:
   Thinking about this at a high level, we can test it like this: instead of a static function `runTeamScore` that creates the pipeline and runs it, you should instead move the logic into a PTransform `TeamScore` and the logic is in the `expand()` method. Then ITs and main() can run it in different ways. This will test the core logic. It will not test the command line interface.



##########
examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java:
##########
@@ -235,10 +245,7 @@ public static void main(String[] args) throws Exception {
                 options.getLeaderBoardTableName() + "_user",
                 configureGlobalWindowBigQueryWrite()));
 
-    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
-    // command line.
-    PipelineResult result = pipeline.run();
-    exampleUtils.waitToFinish(result);
+    pipeline.run(options);

Review Comment:
   Same here. I am not so sure this is good for a user trying the example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org