You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/15 18:29:39 UTC

[GitHub] [beam] xinyuiscool commented on a change in pull request #13550: [BEAM-11458] Backport Samza Runner Changes

xinyuiscool commented on a change in pull request #13550:
URL: https://github.com/apache/beam/pull/13550#discussion_r543553223



##########
File path: runners/samza/build.gradle
##########
@@ -64,6 +70,7 @@ dependencies {
   compile "org.apache.kafka:kafka-clients:0.11.0.2"
   testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
   testCompile project(path: ":runners:core-java", configuration: "testRuntime")
+  testCompile library.java.commons_lang3

Review comment:
       we already have commons_lang3 in the compile path. I think we can remove this.

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
##########
@@ -100,17 +94,46 @@ protected JobInvocation invokeWithExecutor(
         InMemoryJobService.DEFAULT_MAX_INVOCATION_HISTORY);
   }
 
+  private ExpansionServer createExpansionService(String host, int expansionPort)
+      throws IOException {
+    if (host == null) {
+      host = InetAddress.getLoopbackAddress().getHostName();
+    }
+    ExpansionServer expansionServer =
+        ExpansionServer.create(new ExpansionService(), host, expansionPort);
+    LOG.info(
+        "Java ExpansionService started on {}:{}",
+        expansionServer.getHost(),
+        expansionServer.getPort());
+    return expansionServer;
+  }
+
   public void run() throws Exception {
-    final InMemoryJobService service = createJobService(pipelineOptions);
-    final GrpcFnServer<InMemoryJobService> jobServiceGrpcFnServer =
+    // Create services
+    InMemoryJobService service = createJobService();

Review comment:
       final for all the vars (this and below).

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/BeamJobCoordinatorRunner.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import java.time.Duration;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.clustermanager.JobCoordinatorLaunchUtil;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ExternalContext;
+import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.runtime.ApplicationRunner;
+
+/** Runs on Yarn AM, execute planning and launches JobCoordinator. */
+public class BeamJobCoordinatorRunner implements ApplicationRunner {

Review comment:
       Better naming for this. Suggest using a new package,e.g. ...runners.samza.cluster., and remove Beam from the class name as it's confusing for other runners.

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
##########
@@ -37,16 +38,17 @@
 import org.slf4j.LoggerFactory;
 
 /** Driver program that starts a job server. */
-// TODO extend JobServerDriver
+// TODO(BEAM-8510): extend JobServerDriver
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class SamzaJobServerDriver {
   private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class);
 
   private final SamzaPortablePipelineOptions pipelineOptions;
+  private ExpansionServer expansionServer;

Review comment:
       Seems this can just be a local var instead of holding it as member.

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
##########
@@ -439,16 +438,14 @@ public SystemAdmin getAdmin(String systemName, Config config) {
       return source;
     }
 
-    @SuppressWarnings("unchecked")
-    private static <T> Coder<WindowedValue<T>> getCoder(Config config) {
-      return Base64Serializer.deserializeUnchecked(config.get("coder"), Coder.class);
-    }
-
-    private static SamzaPipelineOptions getPipelineOptions(Config config) {
-      return Base64Serializer.deserializeUnchecked(
-              config.get("beamPipelineOptions"), SerializablePipelineOptions.class)
-          .get()
-          .as(SamzaPipelineOptions.class);
+    static SamzaPipelineOptions getPipelineOptions(Config config) {

Review comment:
       keep this as private? Otherwise move it to a util class.

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
##########
@@ -178,7 +179,11 @@ public SamzaExecutionContext create(
 
       final MetricsRegistryMap metricsRegistry =
           (MetricsRegistryMap) containerContext.getContainerMetricsRegistry();
-      SamzaExecutionContext.this.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry));
+      SamzaMetricsContainer samzaMetricsContainer =

Review comment:
       @Zhangyx39 discussed this change, along with the changes in MetricsEnvironment.java, in open source before. Please sync up with him about how do we want to proceed. Please keep the metrics change out of the scope of this RB.

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
##########
@@ -363,20 +365,32 @@ public void run() {
       private void updateWatermark() throws InterruptedException {
         final long time = System.currentTimeMillis();
         if (time - lastWatermarkTime > watermarkInterval) {
+          long watermarkMillis = Long.MAX_VALUE;
           for (UnboundedReader reader : readers) {
             final SystemStreamPartition ssp = readerToSsp.get(reader);
             final Instant currentWatermark =
                 currentWatermarks.containsKey(ssp)
                     ? currentWatermarks.get(ssp)
                     : BoundedWindow.TIMESTAMP_MIN_VALUE;
             final Instant nextWatermark = reader.getWatermark();
+            if (nextWatermark != null) {
+              watermarkMillis = Math.min(nextWatermark.getMillis(), watermarkMillis);
+            }
             if (currentWatermark.isBefore(nextWatermark)) {
               currentWatermarks.put(ssp, nextWatermark);
-              enqueueWatermark(reader);
+              if (BoundedWindow.TIMESTAMP_MAX_VALUE.isAfter(nextWatermark)) {
+                enqueueWatermark(reader);
+              } else {
+                // Max watermark has been reached for this reader.
+                enqueueMaxWatermarkAndEndOfStream(reader);
+              }
             }
           }
 
           lastWatermarkTime = time;
+          if (watermarkMillis == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {

Review comment:
       I think using previous check of nextWatermark inside the loop should be good enough.

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
##########
@@ -363,20 +365,32 @@ public void run() {
       private void updateWatermark() throws InterruptedException {
         final long time = System.currentTimeMillis();
         if (time - lastWatermarkTime > watermarkInterval) {
+          long watermarkMillis = Long.MAX_VALUE;

Review comment:
       This var seems not adding too much value. We should be able to tell the watermark time from nextWatermark.

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
##########
@@ -363,20 +365,32 @@ public void run() {
       private void updateWatermark() throws InterruptedException {
         final long time = System.currentTimeMillis();
         if (time - lastWatermarkTime > watermarkInterval) {
+          long watermarkMillis = Long.MAX_VALUE;
           for (UnboundedReader reader : readers) {
             final SystemStreamPartition ssp = readerToSsp.get(reader);
             final Instant currentWatermark =
                 currentWatermarks.containsKey(ssp)
                     ? currentWatermarks.get(ssp)
                     : BoundedWindow.TIMESTAMP_MIN_VALUE;
             final Instant nextWatermark = reader.getWatermark();
+            if (nextWatermark != null) {
+              watermarkMillis = Math.min(nextWatermark.getMillis(), watermarkMillis);
+            }
             if (currentWatermark.isBefore(nextWatermark)) {
               currentWatermarks.put(ssp, nextWatermark);
-              enqueueWatermark(reader);
+              if (BoundedWindow.TIMESTAMP_MAX_VALUE.isAfter(nextWatermark)) {
+                enqueueWatermark(reader);
+              } else {
+                // Max watermark has been reached for this reader.
+                enqueueMaxWatermarkAndEndOfStream(reader);

Review comment:
       Shall we add
   
   running = false
   

##########
File path: runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
##########
@@ -268,6 +269,7 @@ public void register(SystemStreamPartition ssp, String offset) {
       private final FnWithMetricsWrapper metricsWrapper;
 
       private volatile boolean running;
+      private volatile boolean maxWatermarkReached = false;

Review comment:
       This flag seems a bit cumbersome to understand, as we already have running flag to control the looping.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
##########
@@ -53,6 +54,12 @@
 
   private static final ThreadLocal<@Nullable MetricsContainer> CONTAINER_FOR_THREAD =
       new ThreadLocal<>();
+  private static final AtomicReference<MetricsContainer> CONTAINER_GLOBAL = new AtomicReference<>();

Review comment:
       As mentioned above, please exclude this change from the pr and discuss with @Zhangyx39 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org