You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/14 23:49:06 UTC

[GitHub] [beam] kw2542 opened a new pull request #13550: [BEAM-11458] Backport Samza Runner Changes

kw2542 opened a new pull request #13550:
URL: https://github.com/apache/beam/pull/13550


   1. Samza 1.5 support
   2. Prepare for Async ParDo (BEAM-6550)
   3. Memory usage optimization for event time timers
   4. TestStream support
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Zhangyx39 commented on a change in pull request #13550: [BEAM-11458] Backport Samza Runner Changes

Posted by GitBox <gi...@apache.org>.
Zhangyx39 commented on a change in pull request #13550:
URL: https://github.com/apache/beam/pull/13550#discussion_r543710544



##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
##########
@@ -178,7 +179,11 @@ public SamzaExecutionContext create(
 
       final MetricsRegistryMap metricsRegistry =
           (MetricsRegistryMap) containerContext.getContainerMetricsRegistry();
-      SamzaExecutionContext.this.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry));
+      SamzaMetricsContainer samzaMetricsContainer =

Review comment:
       There are multiple metrics related changes. The ones that are related to global metrics container doesn't need to push to open source. For now, we can exclude all metrics related commits from this PR. We can do that in a separate PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] xinyuiscool commented on a change in pull request #13550: [BEAM-11458] Backport Samza Runner Changes

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on a change in pull request #13550:
URL: https://github.com/apache/beam/pull/13550#discussion_r543553223



##########
File path: runners/samza/build.gradle
##########
@@ -64,6 +70,7 @@ dependencies {
   compile "org.apache.kafka:kafka-clients:0.11.0.2"
   testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
   testCompile project(path: ":runners:core-java", configuration: "testRuntime")
+  testCompile library.java.commons_lang3

Review comment:
       we already have commons_lang3 in the compile path. I think we can remove this.

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
##########
@@ -100,17 +94,46 @@ protected JobInvocation invokeWithExecutor(
         InMemoryJobService.DEFAULT_MAX_INVOCATION_HISTORY);
   }
 
+  private ExpansionServer createExpansionService(String host, int expansionPort)
+      throws IOException {
+    if (host == null) {
+      host = InetAddress.getLoopbackAddress().getHostName();
+    }
+    ExpansionServer expansionServer =
+        ExpansionServer.create(new ExpansionService(), host, expansionPort);
+    LOG.info(
+        "Java ExpansionService started on {}:{}",
+        expansionServer.getHost(),
+        expansionServer.getPort());
+    return expansionServer;
+  }
+
   public void run() throws Exception {
-    final InMemoryJobService service = createJobService(pipelineOptions);
-    final GrpcFnServer<InMemoryJobService> jobServiceGrpcFnServer =
+    // Create services
+    InMemoryJobService service = createJobService();

Review comment:
       final for all the vars (this and below).

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/BeamJobCoordinatorRunner.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import java.time.Duration;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.clustermanager.JobCoordinatorLaunchUtil;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ExternalContext;
+import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.runtime.ApplicationRunner;
+
+/** Runs on Yarn AM, execute planning and launches JobCoordinator. */
+public class BeamJobCoordinatorRunner implements ApplicationRunner {

Review comment:
       Better naming for this. Suggest using a new package,e.g. ...runners.samza.cluster., and remove Beam from the class name as it's confusing for other runners.

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
##########
@@ -37,16 +38,17 @@
 import org.slf4j.LoggerFactory;
 
 /** Driver program that starts a job server. */
-// TODO extend JobServerDriver
+// TODO(BEAM-8510): extend JobServerDriver
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class SamzaJobServerDriver {
   private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class);
 
   private final SamzaPortablePipelineOptions pipelineOptions;
+  private ExpansionServer expansionServer;

Review comment:
       Seems this can just be a local var instead of holding it as member.

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
##########
@@ -439,16 +438,14 @@ public SystemAdmin getAdmin(String systemName, Config config) {
       return source;
     }
 
-    @SuppressWarnings("unchecked")
-    private static <T> Coder<WindowedValue<T>> getCoder(Config config) {
-      return Base64Serializer.deserializeUnchecked(config.get("coder"), Coder.class);
-    }
-
-    private static SamzaPipelineOptions getPipelineOptions(Config config) {
-      return Base64Serializer.deserializeUnchecked(
-              config.get("beamPipelineOptions"), SerializablePipelineOptions.class)
-          .get()
-          .as(SamzaPipelineOptions.class);
+    static SamzaPipelineOptions getPipelineOptions(Config config) {

Review comment:
       keep this as private? Otherwise move it to a util class.

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
##########
@@ -178,7 +179,11 @@ public SamzaExecutionContext create(
 
       final MetricsRegistryMap metricsRegistry =
           (MetricsRegistryMap) containerContext.getContainerMetricsRegistry();
-      SamzaExecutionContext.this.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry));
+      SamzaMetricsContainer samzaMetricsContainer =

Review comment:
       @Zhangyx39 discussed this change, along with the changes in MetricsEnvironment.java, in open source before. Please sync up with him about how do we want to proceed. Please keep the metrics change out of the scope of this RB.

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
##########
@@ -363,20 +365,32 @@ public void run() {
       private void updateWatermark() throws InterruptedException {
         final long time = System.currentTimeMillis();
         if (time - lastWatermarkTime > watermarkInterval) {
+          long watermarkMillis = Long.MAX_VALUE;
           for (UnboundedReader reader : readers) {
             final SystemStreamPartition ssp = readerToSsp.get(reader);
             final Instant currentWatermark =
                 currentWatermarks.containsKey(ssp)
                     ? currentWatermarks.get(ssp)
                     : BoundedWindow.TIMESTAMP_MIN_VALUE;
             final Instant nextWatermark = reader.getWatermark();
+            if (nextWatermark != null) {
+              watermarkMillis = Math.min(nextWatermark.getMillis(), watermarkMillis);
+            }
             if (currentWatermark.isBefore(nextWatermark)) {
               currentWatermarks.put(ssp, nextWatermark);
-              enqueueWatermark(reader);
+              if (BoundedWindow.TIMESTAMP_MAX_VALUE.isAfter(nextWatermark)) {
+                enqueueWatermark(reader);
+              } else {
+                // Max watermark has been reached for this reader.
+                enqueueMaxWatermarkAndEndOfStream(reader);
+              }
             }
           }
 
           lastWatermarkTime = time;
+          if (watermarkMillis == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {

Review comment:
       I think using previous check of nextWatermark inside the loop should be good enough.

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
##########
@@ -363,20 +365,32 @@ public void run() {
       private void updateWatermark() throws InterruptedException {
         final long time = System.currentTimeMillis();
         if (time - lastWatermarkTime > watermarkInterval) {
+          long watermarkMillis = Long.MAX_VALUE;

Review comment:
       This var seems not adding too much value. We should be able to tell the watermark time from nextWatermark.

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
##########
@@ -363,20 +365,32 @@ public void run() {
       private void updateWatermark() throws InterruptedException {
         final long time = System.currentTimeMillis();
         if (time - lastWatermarkTime > watermarkInterval) {
+          long watermarkMillis = Long.MAX_VALUE;
           for (UnboundedReader reader : readers) {
             final SystemStreamPartition ssp = readerToSsp.get(reader);
             final Instant currentWatermark =
                 currentWatermarks.containsKey(ssp)
                     ? currentWatermarks.get(ssp)
                     : BoundedWindow.TIMESTAMP_MIN_VALUE;
             final Instant nextWatermark = reader.getWatermark();
+            if (nextWatermark != null) {
+              watermarkMillis = Math.min(nextWatermark.getMillis(), watermarkMillis);
+            }
             if (currentWatermark.isBefore(nextWatermark)) {
               currentWatermarks.put(ssp, nextWatermark);
-              enqueueWatermark(reader);
+              if (BoundedWindow.TIMESTAMP_MAX_VALUE.isAfter(nextWatermark)) {
+                enqueueWatermark(reader);
+              } else {
+                // Max watermark has been reached for this reader.
+                enqueueMaxWatermarkAndEndOfStream(reader);

Review comment:
       Shall we add
   
   running = false
   

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
##########
@@ -268,6 +269,7 @@ public void register(SystemStreamPartition ssp, String offset) {
       private final FnWithMetricsWrapper metricsWrapper;
 
       private volatile boolean running;
+      private volatile boolean maxWatermarkReached = false;

Review comment:
       This flag seems a bit cumbersome to understand, as we already have running flag to control the looping.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
##########
@@ -53,6 +54,12 @@
 
   private static final ThreadLocal<@Nullable MetricsContainer> CONTAINER_FOR_THREAD =
       new ThreadLocal<>();
+  private static final AtomicReference<MetricsContainer> CONTAINER_GLOBAL = new AtomicReference<>();

Review comment:
       As mentioned above, please exclude this change from the pr and discuss with @Zhangyx39 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Zhangyx39 commented on a change in pull request #13550: [BEAM-11458] Backport Samza Runner Changes

Posted by GitBox <gi...@apache.org>.
Zhangyx39 commented on a change in pull request #13550:
URL: https://github.com/apache/beam/pull/13550#discussion_r543730502



##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
##########
@@ -268,6 +269,7 @@ public void register(SystemStreamPartition ssp, String offset) {
       private final FnWithMetricsWrapper metricsWrapper;
 
       private volatile boolean running;
+      private volatile boolean maxWatermarkReached = false;

Review comment:
       This is needed to finish a nexmark test in streaming mode. As @xinyuiscool suggested, we can remove this flag and just use the running flag.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kw2542 commented on a change in pull request #13550: [BEAM-11458] Backport Samza Runner Changes

Posted by GitBox <gi...@apache.org>.
kw2542 commented on a change in pull request #13550:
URL: https://github.com/apache/beam/pull/13550#discussion_r543727825



##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
##########
@@ -268,6 +269,7 @@ public void register(SystemStreamPartition ssp, String offset) {
       private final FnWithMetricsWrapper metricsWrapper;
 
       private volatile boolean running;
+      private volatile boolean maxWatermarkReached = false;

Review comment:
       I believe this is change added to support Nexmark. @Zhangyx39  can you comment more on this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] xinyuiscool merged pull request #13550: [BEAM-11458] Upgrade SamzRunner to Samza 1.5

Posted by GitBox <gi...@apache.org>.
xinyuiscool merged pull request #13550:
URL: https://github.com/apache/beam/pull/13550


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org