You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2020/04/29 17:40:01 UTC
[beam] branch release-2.21.0 updated: Merge pull request #11537:
[BEAM-6661] Get rid of a few logging annoyances for execution and shutdown
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.21.0 by this push:
new 9364bba Merge pull request #11537: [BEAM-6661] Get rid of a few logging annoyances for execution and shutdown
9364bba is described below
commit 9364bba9e1ac60b846c3045d3dac13dc60dfa811
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Tue Apr 28 22:38:44 2020 +0200
Merge pull request #11537: [BEAM-6661] Get rid of a few logging annoyances for execution and shutdown
Backport of #11537 / 6eb2c6e27e011d206fd3116b301ed11275ec415a.
---
...BeamFileSystemLegacyArtifactStagingService.java | 4 +--
.../environment/ExternalEnvironmentFactory.java | 36 ++++++++++++++--------
sdks/python/apache_beam/utils/subprocess_server.py | 3 +-
3 files changed, 26 insertions(+), 17 deletions(-)
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemLegacyArtifactStagingService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemLegacyArtifactStagingService.java
index fb94915..f7e09e6 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemLegacyArtifactStagingService.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemLegacyArtifactStagingService.java
@@ -111,9 +111,7 @@ public class BeamFileSystemLegacyArtifactStagingService
FileSystems.delete(Collections.singletonList(manifestResourceId));
}
- LOG.debug("Removing empty dir: {}", dir);
- FileSystems.delete(Collections.singletonList(dir));
- LOG.info("Removed dir {}", dir);
+ FileSystems.delete(Collections.singletonList(dir), StandardMoveOptions.IGNORE_MISSING_FILES);
}
@Override
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java
index 94e33c6..46a6efc 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.fnexecution.environment;
import java.time.Duration;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc;
@@ -34,6 +35,7 @@ import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,9 +109,10 @@ public class ExternalEnvironmentFactory implements EnvironmentFactory {
.build();
LOG.debug("Requesting worker ID {}", workerId);
+ final ManagedChannel managedChannel =
+ ManagedChannelFactory.createDefault().forDescriptor(externalPayload.getEndpoint());
BeamFnApi.StartWorkerResponse startWorkerResponse =
- BeamFnExternalWorkerPoolGrpc.newBlockingStub(
- ManagedChannelFactory.createDefault().forDescriptor(externalPayload.getEndpoint()))
+ BeamFnExternalWorkerPoolGrpc.newBlockingStub(managedChannel)
.startWorker(startWorkerRequest);
if (!startWorkerResponse.getError().isEmpty()) {
throw new RuntimeException(startWorkerResponse.getError());
@@ -127,6 +130,7 @@ public class ExternalEnvironmentFactory implements EnvironmentFactory {
workerId);
} catch (InterruptedException interruptEx) {
Thread.currentThread().interrupt();
+ managedChannel.shutdownNow();
throw new RuntimeException(interruptEx);
}
}
@@ -145,17 +149,23 @@ public class ExternalEnvironmentFactory implements EnvironmentFactory {
@Override
public void close() throws Exception {
- finalInstructionHandler.close();
- BeamFnApi.StopWorkerRequest stopWorkerRequest =
- BeamFnApi.StopWorkerRequest.newBuilder().setWorkerId(workerId).build();
- LOG.debug("Closing worker ID {}", workerId);
- BeamFnApi.StopWorkerResponse stopWorkerResponse =
- BeamFnExternalWorkerPoolGrpc.newBlockingStub(
- ManagedChannelFactory.createDefault()
- .forDescriptor(externalPayload.getEndpoint()))
- .stopWorker(stopWorkerRequest);
- if (!stopWorkerResponse.getError().isEmpty()) {
- throw new RuntimeException(stopWorkerResponse.getError());
+ try {
+ finalInstructionHandler.close();
+ BeamFnApi.StopWorkerRequest stopWorkerRequest =
+ BeamFnApi.StopWorkerRequest.newBuilder().setWorkerId(workerId).build();
+ LOG.debug("Closing worker ID {}", workerId);
+ BeamFnApi.StopWorkerResponse stopWorkerResponse =
+ BeamFnExternalWorkerPoolGrpc.newBlockingStub(managedChannel)
+ .stopWorker(stopWorkerRequest);
+ if (!stopWorkerResponse.getError().isEmpty()) {
+ throw new RuntimeException(stopWorkerResponse.getError());
+ }
+ } finally {
+ managedChannel.shutdown();
+ managedChannel.awaitTermination(10, TimeUnit.SECONDS);
+ if (!managedChannel.isTerminated()) {
+ managedChannel.shutdownNow();
+ }
}
}
};
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py
index d9c5085..29307ad 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -91,7 +91,8 @@ class SubprocessServer(object):
def log_stdout():
line = self._process.stdout.readline()
while line:
- _LOGGER.info(line)
+ # Remove newline via rstrip() to not print an empty line
+ _LOGGER.info(line.rstrip())
line = self._process.stdout.readline()
t = threading.Thread(target=log_stdout)