You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/04 16:38:03 UTC

[GitHub] [beam] fernando-wizeline opened a new pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

fernando-wizeline opened a new pull request #17015:
URL: https://github.com/apache/beam/pull/17015


   This is a companion PR to https://github.com/apache/beam/pull/16609
   
   This PR adds integration tests for several classes under the examples/complete/game folder. 
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kileys commented on a change in pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
kileys commented on a change in pull request #17015:
URL: https://github.com/apache/beam/pull/17015#discussion_r823277899



##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
##########
@@ -75,6 +69,9 @@
 })
 public class UserScore {
 
+  // TODO: remove

Review comment:
       Remove

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/UserScoreIT.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link UserScore}. */
+@RunWith(JUnit4.class)
+public class UserScoreIT {
+  public static final String GAMING_DATA_CSV =
+      "gs://apache-beam-samples/game/small/gaming_data.csv";
+  public static final String TEMP_STORAGE_FOR_UPLOAD_TESTS =
+      "gs://temp-storage-for-end-to-end-tests/UserScoreIT/game/"
+          + UserScoreIT.class.getSimpleName();
+  private UserScoreOptions options =
+      TestPipeline.testingPipelineOptions().as(UserScoreOptions.class);
+  private static String projectId;
+  // temp-storage-for-end-to-end-tests
+  @Rule public final transient TestPipeline testPipeline = TestPipeline.fromOptions(options);
+
+  public interface UserScoreOptions extends TestPipelineOptions, UserScore.Options {}
+
+  @Before
+  public void setupTestEnvironment() throws Exception {
+
+    PipelineOptionsFactory.register(TestPipelineOptions.class);
+    projectId = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+    setupPipelineOptions();
+  }
+
+  @Test
+  public void testE2EUserScore() throws Exception {
+    UserScore.runUserScore(options, testPipeline);
+
+    testPipeline.run().waitUntilFinish();

Review comment:
       What results do we have to validate?

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/UserScoreIT.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link UserScore}. */

Review comment:
       ```suggestion
   /** Integration tests for {@link UserScore}. */
   ```

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/UserScoreIT.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link UserScore}. */
+@RunWith(JUnit4.class)
+public class UserScoreIT {
+  public static final String GAMING_DATA_CSV =
+      "gs://apache-beam-samples/game/small/gaming_data.csv";
+  public static final String TEMP_STORAGE_FOR_UPLOAD_TESTS =
+      "gs://temp-storage-for-end-to-end-tests/UserScoreIT/game/"
+          + UserScoreIT.class.getSimpleName();
+  private UserScoreOptions options =
+      TestPipeline.testingPipelineOptions().as(UserScoreOptions.class);
+  private static String projectId;
+  // temp-storage-for-end-to-end-tests
+  @Rule public final transient TestPipeline testPipeline = TestPipeline.fromOptions(options);
+
+  public interface UserScoreOptions extends TestPipelineOptions, UserScore.Options {}
+
+  @Before
+  public void setupTestEnvironment() throws Exception {
+
+    PipelineOptionsFactory.register(TestPipelineOptions.class);
+    projectId = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+    setupPipelineOptions();
+  }
+
+  @Test
+  public void testE2EUserScore() throws Exception {
+    UserScore.runUserScore(options, testPipeline);
+
+    testPipeline.run().waitUntilFinish();
+  }
+
+  @After
+  public void cleanupTestEnvironment() throws Exception {}

Review comment:
       Remove if there's no cleanup needed

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/UserScoreIT.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link UserScore}. */
+@RunWith(JUnit4.class)
+public class UserScoreIT {
+  public static final String GAMING_DATA_CSV =
+      "gs://apache-beam-samples/game/small/gaming_data.csv";
+  public static final String TEMP_STORAGE_FOR_UPLOAD_TESTS =
+      "gs://temp-storage-for-end-to-end-tests/UserScoreIT/game/"
+          + UserScoreIT.class.getSimpleName();
+  private UserScoreOptions options =
+      TestPipeline.testingPipelineOptions().as(UserScoreOptions.class);
+  private static String projectId;
+  // temp-storage-for-end-to-end-tests
+  @Rule public final transient TestPipeline testPipeline = TestPipeline.fromOptions(options);
+
+  public interface UserScoreOptions extends TestPipelineOptions, UserScore.Options {}
+
+  @Before
+  public void setupTestEnvironment() throws Exception {
+
+    PipelineOptionsFactory.register(TestPipelineOptions.class);
+    projectId = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+    setupPipelineOptions();
+  }
+
+  @Test
+  public void testE2EUserScore() throws Exception {
+    UserScore.runUserScore(options, testPipeline);
+
+    testPipeline.run().waitUntilFinish();
+  }
+
+  @After
+  public void cleanupTestEnvironment() throws Exception {}
+
+  private void setupPipelineOptions() {
+    options.as(GcpOptions.class).setProject(projectId);
+    options.as(DirectOptions.class).setBlockOnRun(false);

Review comment:
       We shouldn't need to add a direct runner specific option. Is there an issue with the direct runner?

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/GameStatsIT.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.pubsub.v1.PushConfig;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.examples.complete.game.utils.GameConstants;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GameStats}. */

Review comment:
       ```suggestion
   /** Integration tests for {@link GameStats}. */
   ```

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/UserScoreIT.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link UserScore}. */
+@RunWith(JUnit4.class)
+public class UserScoreIT {
+  public static final String GAMING_DATA_CSV =
+      "gs://apache-beam-samples/game/small/gaming_data.csv";

Review comment:
       Let's use a larger dataset

##########
File path: examples/java/build.gradle
##########
@@ -65,6 +65,13 @@ dependencies {
   implementation library.java.google_api_client
   implementation library.java.google_api_services_bigquery
   implementation library.java.google_api_services_pubsub
+  // GCP PubSub client is used in LeaderBoardIT and StatefulTeamScoreIT
+  implementation library.java.google_cloud_pubsub

Review comment:
       Can we add only to testImplementation?

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/UserScoreIT.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link UserScore}. */
+@RunWith(JUnit4.class)
+public class UserScoreIT {
+  public static final String GAMING_DATA_CSV =
+      "gs://apache-beam-samples/game/small/gaming_data.csv";
+  public static final String TEMP_STORAGE_FOR_UPLOAD_TESTS =
+      "gs://temp-storage-for-end-to-end-tests/UserScoreIT/game/"
+          + UserScoreIT.class.getSimpleName();
+  private UserScoreOptions options =
+      TestPipeline.testingPipelineOptions().as(UserScoreOptions.class);
+  private static String projectId;
+  // temp-storage-for-end-to-end-tests
+  @Rule public final transient TestPipeline testPipeline = TestPipeline.fromOptions(options);

Review comment:
       UserScore.java should create the pipeline, is there a reason we need this? Also, why is it a @Rule and transient?

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
##########
@@ -242,13 +242,20 @@ public static void main(String[] args) throws Exception {
     ExampleUtils exampleUtils = new ExampleUtils(options);
     Pipeline pipeline = Pipeline.create(options);
 
+    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
+    // command line.
+    PipelineResult result = runGameStats(options, pipeline);
+    exampleUtils.waitToFinish(result);
+  }
+
+  static PipelineResult runGameStats(Options options, Pipeline pipeline) {
     // Read Events from Pub/Sub using custom timestamps
     PCollection<GameActionInfo> rawEvents =
         pipeline
             .apply(
                 PubsubIO.readStrings()
-                    .withTimestampAttribute(GameConstants.TIMESTAMP_ATTRIBUTE)
-                    .fromTopic(options.getTopic()))
+                    .fromSubscription(options.getSubscription())

Review comment:
       Is there a reason we need to read from the subscription vs topic?

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
##########
@@ -244,17 +241,22 @@ public static void main(String[] args) throws Exception {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     Pipeline pipeline = Pipeline.create(options);
 
+    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
+    // command line.
+    runUserScore(options, pipeline);
+    pipeline.run().waitUntilFinish();
+  }
+
+  static void runUserScore(Options options, Pipeline pipeline) {
+
     // Read events from a text file and parse them.
+
     pipeline
         .apply(TextIO.read().from(options.getInput()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
-        // Extract and sum username/score pairs from the event data.

Review comment:
       Leave comment

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
##########
@@ -28,16 +30,8 @@
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.options.*;

Review comment:
       Leave the imports as the specific packages

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/UserScoreIT.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link UserScore}. */
+@RunWith(JUnit4.class)
+public class UserScoreIT {
+  public static final String GAMING_DATA_CSV =
+      "gs://apache-beam-samples/game/small/gaming_data.csv";
+  public static final String TEMP_STORAGE_FOR_UPLOAD_TESTS =
+      "gs://temp-storage-for-end-to-end-tests/UserScoreIT/game/"
+          + UserScoreIT.class.getSimpleName();

Review comment:
       This might override results if it's run in parallel. Can you take a look at https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsIT.java#L58

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/UserScoreIT.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link UserScore}. */
+@RunWith(JUnit4.class)
+public class UserScoreIT {
+  public static final String GAMING_DATA_CSV =
+      "gs://apache-beam-samples/game/small/gaming_data.csv";
+  public static final String TEMP_STORAGE_FOR_UPLOAD_TESTS =
+      "gs://temp-storage-for-end-to-end-tests/UserScoreIT/game/"
+          + UserScoreIT.class.getSimpleName();
+  private UserScoreOptions options =
+      TestPipeline.testingPipelineOptions().as(UserScoreOptions.class);
+  private static String projectId;
+  // temp-storage-for-end-to-end-tests

Review comment:
       ?

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/GameStatsIT.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.pubsub.v1.PushConfig;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.examples.complete.game.utils.GameConstants;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GameStats}. */
+@RunWith(JUnit4.class)
+public class GameStatsIT {
+  private static final DateTimeFormatter DATETIME_FORMAT =
+      DateTimeFormat.forPattern("YYYY-MM-dd-HH-mm-ss-SSS");
+  private static final String EVENTS_TOPIC_NAME = "events";
+  public static final String GAME_STATS_TEAM_TABLE = "game_stats_team";
+  private static final Integer DEFAULT_ACK_DEADLINE_SECONDS = 60;
+  public static final String SELECT_COUNT_AS_TOTAL_QUERY =
+      "SELECT total_score FROM `%s.%s.%s` where team like(\"AmaranthKoala\")";
+  private GameStatsOptions options =
+      TestPipeline.testingPipelineOptions().as(GameStatsIT.GameStatsOptions.class);
+  @Rule public final transient TestPipeline testPipeline = TestPipeline.fromOptions(options);
+  private static String pubsubEndpoint;
+  private @Nullable ManagedChannel channel = null;
+  private @Nullable TransportChannelProvider channelProvider = null;
+  private @Nullable TopicAdminClient topicAdmin = null;
+  private @Nullable SubscriptionAdminClient subscriptionAdmin = null;
+  private @Nullable TopicPath eventsTopicPath = null;
+  private @Nullable SubscriptionPath subscriptionPath = null;
+  private String projectId;
+  private static final String TOPIC_PREFIX = "gamestats-";
+  private BigqueryClient bqClient;
+  private final String OUTPUT_DATASET = "game_stats_e2e";
+
+  public interface GameStatsOptions extends TestPipelineOptions, GameStats.Options {};
+
+  @Before
+  public void setupTestEnvironment() throws Exception {
+    PipelineOptionsFactory.register(TestPipelineOptions.class);
+    projectId = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+    setupBigQuery();

Review comment:
       Can you look into using ExamplesUtils for the setup/breakdown?
   https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
##########
@@ -244,17 +241,22 @@ public static void main(String[] args) throws Exception {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     Pipeline pipeline = Pipeline.create(options);
 
+    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
+    // command line.
+    runUserScore(options, pipeline);
+    pipeline.run().waitUntilFinish();
+  }
+
+  static void runUserScore(Options options, Pipeline pipeline) {
+
     // Read events from a text file and parse them.
+
     pipeline
         .apply(TextIO.read().from(options.getInput()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
-        // Extract and sum username/score pairs from the event data.
         .apply("ExtractUserScore", new ExtractAndSumScore("user"))
         .apply(
             "WriteUserScoreSums", new WriteToText<>(options.getOutput(), configureOutput(), false));
-
-    // Run the batch pipeline.
-    pipeline.run().waitUntilFinish();

Review comment:
       It makes more sense to leave this call in the method that says "runUserScore"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1084898694


   Run Java Examples_Spark


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1071040382


   Run Java Examples on Dataflow Runner V2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1072863736


   Run Java Examples_Flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1084916913


   Run Java Examples on Dataflow Runner V2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1075802474


   Run Java Examples_Flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1084840883


   Run Java Examples_Direct


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1069434175


   Run Java Examples_Direct


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1075657411


   Run Java Examples_Flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1072833816


   Run Java Examples_Direct


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1071040382


   Run Java Examples on Dataflow Runner V2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on a change in pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on a change in pull request #17015:
URL: https://github.com/apache/beam/pull/17015#discussion_r826117880



##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
##########
@@ -242,13 +242,20 @@ public static void main(String[] args) throws Exception {
     ExampleUtils exampleUtils = new ExampleUtils(options);
     Pipeline pipeline = Pipeline.create(options);
 
+    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
+    // command line.
+    PipelineResult result = runGameStats(options, pipeline);
+    exampleUtils.waitToFinish(result);
+  }
+
+  static PipelineResult runGameStats(Options options, Pipeline pipeline) {
     // Read Events from Pub/Sub using custom timestamps
     PCollection<GameActionInfo> rawEvents =
         pipeline
             .apply(
                 PubsubIO.readStrings()
-                    .withTimestampAttribute(GameConstants.TIMESTAMP_ATTRIBUTE)
-                    .fromTopic(options.getTopic()))
+                    .fromSubscription(options.getSubscription())

Review comment:
       Oh, If I'm not mistaken, If no subscription is provided when reading, the API will crate its own one. I'm feeding the subscription I created during the IT setup so I can delete it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kileys commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
kileys commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1063582582


   Can you post the test results for these examples on different runner? Like https://github.com/apache/beam/pull/16609#issuecomment-1055888591


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1067392827


   Run Java Examples_Flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on a change in pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on a change in pull request #17015:
URL: https://github.com/apache/beam/pull/17015#discussion_r835962861



##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
##########
@@ -242,13 +242,20 @@ public static void main(String[] args) throws Exception {
     ExampleUtils exampleUtils = new ExampleUtils(options);
     Pipeline pipeline = Pipeline.create(options);
 
+    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
+    // command line.
+    PipelineResult result = runGameStats(options, pipeline);
+    exampleUtils.waitToFinish(result);
+  }
+
+  static PipelineResult runGameStats(Options options, Pipeline pipeline) {
     // Read Events from Pub/Sub using custom timestamps
     PCollection<GameActionInfo> rawEvents =
         pipeline
             .apply(
                 PubsubIO.readStrings()
-                    .withTimestampAttribute(GameConstants.TIMESTAMP_ATTRIBUTE)
-                    .fromTopic(options.getTopic()))
+                    .fromSubscription(options.getSubscription())

Review comment:
       If it is ok, I would like to keep using the subscription created within the IT instead of the one created automatically when trying to read from the topic. Not sure why, but it looks like reading from the topic is a blocking operation and the pipeline never reaches the following step.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1069674347


   Run Java Examples on Dataflow Runner V2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on a change in pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on a change in pull request #17015:
URL: https://github.com/apache/beam/pull/17015#discussion_r836071102



##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/GameStatsIT.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.pubsub.v1.PushConfig;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.examples.complete.game.utils.GameConstants;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GameStats}. */
+@RunWith(JUnit4.class)
+public class GameStatsIT {
+  private static final DateTimeFormatter DATETIME_FORMAT =
+      DateTimeFormat.forPattern("YYYY-MM-dd-HH-mm-ss-SSS");
+  private static final String EVENTS_TOPIC_NAME = "events";
+  public static final String GAME_STATS_TEAM_TABLE = "game_stats_team";
+  private static final Integer DEFAULT_ACK_DEADLINE_SECONDS = 60;
+  public static final String SELECT_COUNT_AS_TOTAL_QUERY =
+      "SELECT total_score FROM `%s.%s.%s` where team like(\"AmaranthKoala\")";
+  private GameStatsOptions options =
+      TestPipeline.testingPipelineOptions().as(GameStatsIT.GameStatsOptions.class);
+  @Rule public final transient TestPipeline testPipeline = TestPipeline.fromOptions(options);
+  private static String pubsubEndpoint;
+  private @Nullable ManagedChannel channel = null;
+  private @Nullable TransportChannelProvider channelProvider = null;
+  private @Nullable TopicAdminClient topicAdmin = null;
+  private @Nullable SubscriptionAdminClient subscriptionAdmin = null;
+  private @Nullable TopicPath eventsTopicPath = null;
+  private @Nullable SubscriptionPath subscriptionPath = null;
+  private String projectId;
+  private static final String TOPIC_PREFIX = "gamestats-";
+  private BigqueryClient bqClient;
+  private final String OUTPUT_DATASET = "game_stats_e2e";
+
+  public interface GameStatsOptions extends TestPipelineOptions, GameStats.Options {};
+
+  @Before
+  public void setupTestEnvironment() throws Exception {
+    PipelineOptionsFactory.register(TestPipelineOptions.class);
+    projectId = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+    setupBigQuery();

Review comment:
       I've tried setting up PubSub and BQ using ExampleUtils but neither the topic/subscription nor the dataset get created quickly enough for the reading/writing to take place.
   The way they're being setup/teared down right now work as expected. Is it ok if I stick with that approach?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on a change in pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on a change in pull request #17015:
URL: https://github.com/apache/beam/pull/17015#discussion_r835491915



##########
File path: examples/java/build.gradle
##########
@@ -91,10 +91,18 @@ dependencies {
   implementation "org.apache.commons:commons-lang3:3.9"
   implementation "org.apache.httpcomponents:httpclient:4.5.13"
   implementation "org.apache.httpcomponents:httpcore:4.4.13"
+  implementation project(path: ":runners:flink:1.11")

Review comment:
       I added that one so that I could use the FlinkPipelineOptions to set the setFasterCopy to true. When running the example through flink it recommends setting that to true. Is it ok to leave it there? Not sure about the pros and cons.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on a change in pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on a change in pull request #17015:
URL: https://github.com/apache/beam/pull/17015#discussion_r836685824



##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
##########
@@ -170,7 +176,7 @@ public static void main(String[] args) throws Exception {
         // Extract and sum teamname/score pairs from the event data.
         .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
         .apply(
-            "WriteTeamScoreSums", new WriteToText<>(options.getOutput(), configureOutput(), true));
+            "WriteTeamScoreSums", new WriteToText<>(options.getOutput(), configureOutput(), false));

Review comment:
       This is just to write the results to one file per round instead of several files per windowing. The windowing itself is not being changed if I'm not mistaken.
   I added an extra option and set it to true as default to keep the original functionality.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on a change in pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on a change in pull request #17015:
URL: https://github.com/apache/beam/pull/17015#discussion_r826139819



##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/UserScoreIT.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link UserScore}. */
+@RunWith(JUnit4.class)
+public class UserScoreIT {
+  public static final String GAMING_DATA_CSV =
+      "gs://apache-beam-samples/game/small/gaming_data.csv";

Review comment:
       What is the recommended dataset to use in this case? I see some gs://apache-beam-samples/game/gaming_data1.csv and here gs://apache-beam-samples/game/gaming_data2.csv. Are those ok to use?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1067451135


   Run Java Examples_Spark


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1067455085


   Run Java Examples on Dataflow Runner V2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1069570646


   Run Java Examples_Flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1083351243


   Run Java Examples_Flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1084835642


   Run Java Examples_Flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on a change in pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on a change in pull request #17015:
URL: https://github.com/apache/beam/pull/17015#discussion_r826215537



##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/GameStatsIT.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.pubsub.v1.PushConfig;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.examples.complete.game.utils.GameConstants;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GameStats}. */
+@RunWith(JUnit4.class)
+public class GameStatsIT {
+  private static final DateTimeFormatter DATETIME_FORMAT =
+      DateTimeFormat.forPattern("YYYY-MM-dd-HH-mm-ss-SSS");
+  private static final String EVENTS_TOPIC_NAME = "events";
+  public static final String GAME_STATS_TEAM_TABLE = "game_stats_team";
+  private static final Integer DEFAULT_ACK_DEADLINE_SECONDS = 60;
+  public static final String SELECT_COUNT_AS_TOTAL_QUERY =
+      "SELECT total_score FROM `%s.%s.%s` where team like(\"AmaranthKoala\")";
+  private GameStatsOptions options =
+      TestPipeline.testingPipelineOptions().as(GameStatsIT.GameStatsOptions.class);
+  @Rule public final transient TestPipeline testPipeline = TestPipeline.fromOptions(options);
+  private static String pubsubEndpoint;
+  private @Nullable ManagedChannel channel = null;
+  private @Nullable TransportChannelProvider channelProvider = null;
+  private @Nullable TopicAdminClient topicAdmin = null;
+  private @Nullable SubscriptionAdminClient subscriptionAdmin = null;
+  private @Nullable TopicPath eventsTopicPath = null;
+  private @Nullable SubscriptionPath subscriptionPath = null;
+  private String projectId;
+  private static final String TOPIC_PREFIX = "gamestats-";
+  private BigqueryClient bqClient;
+  private final String OUTPUT_DATASET = "game_stats_e2e";
+
+  public interface GameStatsOptions extends TestPipelineOptions, GameStats.Options {};
+
+  @Before
+  public void setupTestEnvironment() throws Exception {
+    PipelineOptionsFactory.register(TestPipelineOptions.class);
+    projectId = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+    setupBigQuery();

Review comment:
       About the setup/breakdown of the BigQuery dataset, which part from ExampleUtils should I take a look? The test requires only the dataset and WriteWindowedToBigQuery provides the rest of the configs needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1068201971


   Run Java Examples_Direct


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1069661310


   Run Java Examples_Spark


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kileys commented on a change in pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
kileys commented on a change in pull request #17015:
URL: https://github.com/apache/beam/pull/17015#discussion_r833581164



##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
##########
@@ -238,17 +238,26 @@ public static void main(String[] args) throws Exception {
 
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     // Enforce that this pipeline is always run in streaming mode.
+
     options.setStreaming(true);
-    ExampleUtils exampleUtils = new ExampleUtils(options);
+    runGameStats(options);
+  }
+
+  static void runGameStats(Options options) throws IOException {
+
     Pipeline pipeline = Pipeline.create(options);
 
+    // Using ExampleUtils to set up BigQuery resource.
+    ExampleUtils exampleUtils = new ExampleUtils(options);

Review comment:
       The comment specifies that the dataset must exist. We shouldn't need to do any setup of the BQ table here

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
##########
@@ -170,7 +176,7 @@ public static void main(String[] args) throws Exception {
         // Extract and sum teamname/score pairs from the event data.
         .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
         .apply(
-            "WriteTeamScoreSums", new WriteToText<>(options.getOutput(), configureOutput(), true));
+            "WriteTeamScoreSums", new WriteToText<>(options.getOutput(), configureOutput(), false));

Review comment:
       Why are we changing the windowing?

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
##########
@@ -124,6 +137,16 @@
     String getLeaderBoardTableName();
 
     void setLeaderBoardTableName(String value);
+
+    @Description("Path to the data file(s) containing game data.")
+    /* The default maps to a small Google Cloud Storage file (each ~8MB)
+
+    Note: You may want to use a small sample dataset to test it locally/quickly : gs://apache-beam-samples/game/small/gaming_data.csv
+    You can also download it via the command line gsutil cp gs://apache-beam-samples/game/small/gaming_data.csv ./destination_folder/gaming_data.csv */
+    @Default.String("gs://apache-beam-samples/game/small/gaming_data.csv")
+    String getInput();
+
+    void setInput(String value);

Review comment:
       This example reads from Pub/Sub. We shouldn't need an input option here.

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreIT.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import static org.apache.beam.sdk.testing.FileChecksumMatcher.fileContentsHaveChecksum;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.util.NumberedShardedFile;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration Tests for {@link HourlyTeamScore}. */
+@RunWith(JUnit4.class)
+public class HourlyTeamScoreIT {
+  public static final String GAMING_DATA_CSV =

Review comment:
       Let's use the default input option

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreIT.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import static org.apache.beam.sdk.testing.FileChecksumMatcher.fileContentsHaveChecksum;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.util.NumberedShardedFile;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration Tests for {@link HourlyTeamScore}. */
+@RunWith(JUnit4.class)
+public class HourlyTeamScoreIT {
+  public static final String GAMING_DATA_CSV =
+      "gs://apache-beam-samples/game/small/gaming_data.csv";
+  public static final String TEMP_STORAGE_DIR = "gs://temp-storage-for-end-to-end-tests";
+  private static final String DEFAULT_OUTPUT_CHECKSUM = "f920742fd1b363d01b0a5a44c951c683ea348a47";
+  private HourlyTeamScoreOptions options =
+      TestPipeline.testingPipelineOptions().as(HourlyTeamScoreOptions.class);
+  private static String projectId;
+
+  public interface HourlyTeamScoreOptions extends TestPipelineOptions, HourlyTeamScore.Options {}
+
+  @Before
+  public void setupTestEnvironment() throws Exception {
+
+    PipelineOptionsFactory.register(TestPipelineOptions.class);
+    projectId = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+    setupPipelineOptions();
+  }
+
+  @Test
+  public void testE2EHourlyTeamScore() throws Exception {
+
+    HourlyTeamScore.runHourlyTeamScore(options);
+
+    assertThat(
+        new NumberedShardedFile(options.getOutput() + "*-of-*"),
+        fileContentsHaveChecksum(DEFAULT_OUTPUT_CHECKSUM));
+  }
+
+  private void setupPipelineOptions() {
+    options.as(GcpOptions.class).setProject(projectId);
+    options.setBlockOnRun(false);
+    options.setInput(GAMING_DATA_CSV);
+    options.setOutput(
+        FileSystems.matchNewResource(TEMP_STORAGE_DIR, true)
+            .resolve(
+                String.format("hourlyteamscore-it-%tF-%<tH-%<tM-%<tS-%<tL", new Date()),

Review comment:
       Nit:
   ```suggestion
                   String.format("HourlyTeamScoreIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()),
   ```

##########
File path: examples/java/build.gradle
##########
@@ -91,10 +91,18 @@ dependencies {
   implementation "org.apache.commons:commons-lang3:3.9"
   implementation "org.apache.httpcomponents:httpclient:4.5.13"
   implementation "org.apache.httpcomponents:httpcore:4.4.13"
+  implementation project(path: ":runners:flink:1.11")

Review comment:
       Why do the examples need a Flink dependency?

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
##########
@@ -115,18 +115,27 @@
   public static void main(String[] args) throws Exception {
 
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
     // Enforce that this pipeline is always run in streaming mode.
     options.setStreaming(true);
-    ExampleUtils exampleUtils = new ExampleUtils(options);
+    runStatefulTeamScore(options);
+  }
+
+  static void runStatefulTeamScore(Options options) throws IOException {
+
     Pipeline pipeline = Pipeline.create(options);
 
+    // Using ExampleUtils to set up BigQuery resource.
+    ExampleUtils exampleUtils = new ExampleUtils(options);
+    exampleUtils.setupBigQueryTable();

Review comment:
       Same question about the table here

##########
File path: runners/flink/flink_runner.gradle
##########
@@ -314,6 +314,8 @@ tasks.register("examplesIntegrationTest", Test) {
       excludeTestsMatching 'org.apache.beam.examples.WindowedWordCountIT.testWindowedWordCountInBatchDynamicSharding'
       // TODO (BEAM-14019) Fix integration Tests to run with FlinkRunner: Error deleting table, Not found: Dataset
       excludeTestsMatching 'org.apache.beam.examples.cookbook.BigQueryTornadoesIT.testE2eBigQueryTornadoesWithStorageApiUsingQuery'
+      // TODO Fix GameStats Example

Review comment:
       Can you leave more info about what's failing?

##########
File path: runners/direct-java/build.gradle
##########
@@ -218,6 +218,8 @@ task examplesIntegrationTest(type: Test) {
       // TODO (BEAM-14019) Fix integration Tests to run with DirectRunner: Timeout error
       excludeTestsMatching 'org.apache.beam.examples.complete.TfIdfIT'
       excludeTestsMatching 'org.apache.beam.examples.WindowedWordCountIT.testWindowedWordCountInBatchDynamicSharding'
+      // TODO Fix GameStats Example
+      excludeTestsMatching 'org.apache.beam.examples.complete.game.GameStatsIT'

Review comment:
       Can you leave more info about what's failing?

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
##########
@@ -195,17 +218,25 @@ public static void main(String[] args) throws Exception {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     // Enforce that this pipeline is always run in streaming mode.
     options.setStreaming(true);
-    ExampleUtils exampleUtils = new ExampleUtils(options);
+    runLeaderBoard(options);
+  }
+
+  static void runLeaderBoard(Options options) throws IOException {
+
     Pipeline pipeline = Pipeline.create(options);
 
+    // Using ExampleUtils to set up BigQuery resource.
+    ExampleUtils exampleUtils = new ExampleUtils(options);

Review comment:
       Same comment as before

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/UserScoreIT.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link UserScore}. */
+@RunWith(JUnit4.class)
+public class UserScoreIT {
+  public static final String GAMING_DATA_CSV =
+      "gs://apache-beam-samples/game/small/gaming_data.csv";

Review comment:
       Yes, that works

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
##########
@@ -242,13 +242,20 @@ public static void main(String[] args) throws Exception {
     ExampleUtils exampleUtils = new ExampleUtils(options);
     Pipeline pipeline = Pipeline.create(options);
 
+    // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
+    // command line.
+    PipelineResult result = runGameStats(options, pipeline);
+    exampleUtils.waitToFinish(result);
+  }
+
+  static PipelineResult runGameStats(Options options, Pipeline pipeline) {
     // Read Events from Pub/Sub using custom timestamps
     PCollection<GameActionInfo> rawEvents =
         pipeline
             .apply(
                 PubsubIO.readStrings()
-                    .withTimestampAttribute(GameConstants.TIMESTAMP_ATTRIBUTE)
-                    .fromTopic(options.getTopic()))
+                    .fromSubscription(options.getSubscription())

Review comment:
       Can we just delete the topic in the cleanup of the integration test? It looks like subscriptions get deleted if it's inactive. I see this in some subscriptions in the project.
   
   Subscription expiration 
   Subscription expires in 31 days if there is no activity.

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/GameStatsIT.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.pubsub.v1.PushConfig;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.examples.complete.game.utils.GameConstants;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GameStats}. */
+@RunWith(JUnit4.class)
+public class GameStatsIT {
+  private static final DateTimeFormatter DATETIME_FORMAT =
+      DateTimeFormat.forPattern("YYYY-MM-dd-HH-mm-ss-SSS");
+  private static final String EVENTS_TOPIC_NAME = "events";
+  public static final String GAME_STATS_TEAM_TABLE = "game_stats_team";
+  private static final Integer DEFAULT_ACK_DEADLINE_SECONDS = 60;
+  public static final String SELECT_COUNT_AS_TOTAL_QUERY =
+      "SELECT total_score FROM `%s.%s.%s` where team like(\"AmaranthKoala\")";
+  private GameStatsOptions options =
+      TestPipeline.testingPipelineOptions().as(GameStatsIT.GameStatsOptions.class);
+  @Rule public final transient TestPipeline testPipeline = TestPipeline.fromOptions(options);
+  private static String pubsubEndpoint;
+  private @Nullable ManagedChannel channel = null;
+  private @Nullable TransportChannelProvider channelProvider = null;
+  private @Nullable TopicAdminClient topicAdmin = null;
+  private @Nullable SubscriptionAdminClient subscriptionAdmin = null;
+  private @Nullable TopicPath eventsTopicPath = null;
+  private @Nullable SubscriptionPath subscriptionPath = null;
+  private String projectId;
+  private static final String TOPIC_PREFIX = "gamestats-";
+  private BigqueryClient bqClient;
+  private final String OUTPUT_DATASET = "game_stats_e2e";
+
+  public interface GameStatsOptions extends TestPipelineOptions, GameStats.Options {};
+
+  @Before
+  public void setupTestEnvironment() throws Exception {
+    PipelineOptionsFactory.register(TestPipelineOptions.class);
+    projectId = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+    setupBigQuery();

Review comment:
       I meant instead of writing your own setupPubSub and setupBigQuery, you can use the setup in the ExamplesUtils that does exactly that. It also has a method waitToFinish that'll tear down pubsub and BQ resources after the pipeline finishes running

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreIT.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import static org.apache.beam.sdk.testing.FileChecksumMatcher.fileContentsHaveChecksum;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.util.NumberedShardedFile;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration Tests for {@link HourlyTeamScore}. */
+@RunWith(JUnit4.class)
+public class HourlyTeamScoreIT {
+  public static final String GAMING_DATA_CSV =
+      "gs://apache-beam-samples/game/small/gaming_data.csv";
+  public static final String TEMP_STORAGE_DIR = "gs://temp-storage-for-end-to-end-tests";
+  private static final String DEFAULT_OUTPUT_CHECKSUM = "f920742fd1b363d01b0a5a44c951c683ea348a47";
+  private HourlyTeamScoreOptions options =
+      TestPipeline.testingPipelineOptions().as(HourlyTeamScoreOptions.class);
+  private static String projectId;
+
+  public interface HourlyTeamScoreOptions extends TestPipelineOptions, HourlyTeamScore.Options {}
+
+  @Before
+  public void setupTestEnvironment() throws Exception {
+
+    PipelineOptionsFactory.register(TestPipelineOptions.class);
+    projectId = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+    setupPipelineOptions();
+  }
+
+  @Test
+  public void testE2EHourlyTeamScore() throws Exception {
+
+    HourlyTeamScore.runHourlyTeamScore(options);
+
+    assertThat(
+        new NumberedShardedFile(options.getOutput() + "*-of-*"),
+        fileContentsHaveChecksum(DEFAULT_OUTPUT_CHECKSUM));
+  }
+
+  private void setupPipelineOptions() {
+    options.as(GcpOptions.class).setProject(projectId);
+    options.setBlockOnRun(false);
+    options.setInput(GAMING_DATA_CSV);
+    options.setOutput(
+        FileSystems.matchNewResource(TEMP_STORAGE_DIR, true)

Review comment:
       Should be able to use temp root
   ```suggestion
           FileSystems.matchNewResource(options.getTempRoot(), true)
   ```

##########
File path: examples/java/src/test/java/org/apache/beam/examples/complete/game/GameStatsIT.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.game;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.pubsub.v1.PushConfig;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.examples.complete.game.utils.GameConstants;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GameStats}. */
+@RunWith(JUnit4.class)
+public class GameStatsIT {
+  private static final DateTimeFormatter DATETIME_FORMAT =
+      DateTimeFormat.forPattern("YYYY-MM-dd-HH-mm-ss-SSS");
+  private static final String EVENTS_TOPIC_NAME = "events";
+  public static final String GAME_STATS_TEAM_TABLE = "game_stats_team";
+  private static final Integer DEFAULT_ACK_DEADLINE_SECONDS = 60;
+  public static final String SELECT_COUNT_AS_TOTAL_QUERY =
+      "SELECT total_score FROM `%s.%s.%s` where team like(\"AmaranthKoala\")";
+  private GameStatsOptions options =
+      TestPipeline.testingPipelineOptions().as(GameStatsIT.GameStatsOptions.class);
+  @Rule public final transient TestPipeline testPipeline = TestPipeline.fromOptions(options);
+  private static String pubsubEndpoint;
+  private @Nullable ManagedChannel channel = null;
+  private @Nullable TransportChannelProvider channelProvider = null;
+  private @Nullable TopicAdminClient topicAdmin = null;
+  private @Nullable SubscriptionAdminClient subscriptionAdmin = null;
+  private @Nullable TopicPath eventsTopicPath = null;
+  private @Nullable SubscriptionPath subscriptionPath = null;
+  private String projectId;
+  private static final String TOPIC_PREFIX = "gamestats-";
+  private BigqueryClient bqClient;
+  private final String OUTPUT_DATASET = "game_stats_e2e";
+
+  public interface GameStatsOptions extends TestPipelineOptions, GameStats.Options {};
+
+  @Before
+  public void setupTestEnvironment() throws Exception {
+    PipelineOptionsFactory.register(TestPipelineOptions.class);
+    projectId = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+
+    setupBigQuery();

Review comment:
       You can specify the names through the options




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] fernando-wizeline commented on pull request #17015: [BEAM-12572] - Java examples should get continuously exercised on at least 2 runners

Posted by GitBox <gi...@apache.org>.
fernando-wizeline commented on pull request #17015:
URL: https://github.com/apache/beam/pull/17015#issuecomment-1063090527


   Hi @kileys!
   I added the missing integration tests here which complement the ones Benjamin added.
   Can you help me with a review?
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org