You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2020/04/29 17:40:01 UTC

[beam] branch release-2.21.0 updated: Merge pull request #11537: [BEAM-6661] Get rid of a few logging annoyances for execution and shutdown

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.21.0 by this push:
     new 9364bba  Merge pull request #11537: [BEAM-6661] Get rid of a few logging annoyances for execution and shutdown
9364bba is described below

commit 9364bba9e1ac60b846c3045d3dac13dc60dfa811
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Tue Apr 28 22:38:44 2020 +0200

    Merge pull request #11537: [BEAM-6661] Get rid of a few logging annoyances for execution and shutdown
    
    Backport of #11537 / 6eb2c6e27e011d206fd3116b301ed11275ec415a.
---
 ...BeamFileSystemLegacyArtifactStagingService.java |  4 +--
 .../environment/ExternalEnvironmentFactory.java    | 36 ++++++++++++++--------
 sdks/python/apache_beam/utils/subprocess_server.py |  3 +-
 3 files changed, 26 insertions(+), 17 deletions(-)

diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemLegacyArtifactStagingService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemLegacyArtifactStagingService.java
index fb94915..f7e09e6 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemLegacyArtifactStagingService.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemLegacyArtifactStagingService.java
@@ -111,9 +111,7 @@ public class BeamFileSystemLegacyArtifactStagingService
       FileSystems.delete(Collections.singletonList(manifestResourceId));
     }
 
-    LOG.debug("Removing empty dir: {}", dir);
-    FileSystems.delete(Collections.singletonList(dir));
-    LOG.info("Removed dir {}", dir);
+    FileSystems.delete(Collections.singletonList(dir), StandardMoveOptions.IGNORE_MISSING_FILES);
   }
 
   @Override
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java
index 94e33c6..46a6efc 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.fnexecution.environment;
 
 import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc;
@@ -34,6 +35,7 @@ import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
 import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
 import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ManagedChannel;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -107,9 +109,10 @@ public class ExternalEnvironmentFactory implements EnvironmentFactory {
             .build();
 
     LOG.debug("Requesting worker ID {}", workerId);
+    final ManagedChannel managedChannel =
+        ManagedChannelFactory.createDefault().forDescriptor(externalPayload.getEndpoint());
     BeamFnApi.StartWorkerResponse startWorkerResponse =
-        BeamFnExternalWorkerPoolGrpc.newBlockingStub(
-                ManagedChannelFactory.createDefault().forDescriptor(externalPayload.getEndpoint()))
+        BeamFnExternalWorkerPoolGrpc.newBlockingStub(managedChannel)
             .startWorker(startWorkerRequest);
     if (!startWorkerResponse.getError().isEmpty()) {
       throw new RuntimeException(startWorkerResponse.getError());
@@ -127,6 +130,7 @@ public class ExternalEnvironmentFactory implements EnvironmentFactory {
             workerId);
       } catch (InterruptedException interruptEx) {
         Thread.currentThread().interrupt();
+        managedChannel.shutdownNow();
         throw new RuntimeException(interruptEx);
       }
     }
@@ -145,17 +149,23 @@ public class ExternalEnvironmentFactory implements EnvironmentFactory {
 
       @Override
       public void close() throws Exception {
-        finalInstructionHandler.close();
-        BeamFnApi.StopWorkerRequest stopWorkerRequest =
-            BeamFnApi.StopWorkerRequest.newBuilder().setWorkerId(workerId).build();
-        LOG.debug("Closing worker ID {}", workerId);
-        BeamFnApi.StopWorkerResponse stopWorkerResponse =
-            BeamFnExternalWorkerPoolGrpc.newBlockingStub(
-                    ManagedChannelFactory.createDefault()
-                        .forDescriptor(externalPayload.getEndpoint()))
-                .stopWorker(stopWorkerRequest);
-        if (!stopWorkerResponse.getError().isEmpty()) {
-          throw new RuntimeException(stopWorkerResponse.getError());
+        try {
+          finalInstructionHandler.close();
+          BeamFnApi.StopWorkerRequest stopWorkerRequest =
+              BeamFnApi.StopWorkerRequest.newBuilder().setWorkerId(workerId).build();
+          LOG.debug("Closing worker ID {}", workerId);
+          BeamFnApi.StopWorkerResponse stopWorkerResponse =
+              BeamFnExternalWorkerPoolGrpc.newBlockingStub(managedChannel)
+                  .stopWorker(stopWorkerRequest);
+          if (!stopWorkerResponse.getError().isEmpty()) {
+            throw new RuntimeException(stopWorkerResponse.getError());
+          }
+        } finally {
+          managedChannel.shutdown();
+          managedChannel.awaitTermination(10, TimeUnit.SECONDS);
+          if (!managedChannel.isTerminated()) {
+            managedChannel.shutdownNow();
+          }
         }
       }
     };
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py
index d9c5085..29307ad 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -91,7 +91,8 @@ class SubprocessServer(object):
         def log_stdout():
           line = self._process.stdout.readline()
           while line:
-            _LOGGER.info(line)
+            # Remove newline via rstrip() to not print an empty line
+            _LOGGER.info(line.rstrip())
             line = self._process.stdout.readline()
 
         t = threading.Thread(target=log_stdout)