You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/27 17:10:18 UTC

[GitHub] [beam] mxm opened a new pull request #11537: [BEAM-6661]

mxm opened a new pull request #11537:
URL: https://github.com/apache/beam/pull/11537


   ### [BEAM-6661] Properly close channel for external environment
   
   This avoids warnings like the following:
   
   ```
   org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
   SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=628, target=localhost:41409} was not shutdown properly!!! ~*~*~*
   Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
   java.lang.RuntimeException: ManagedChannel allocation site
   at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:103)
   at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
   at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
   at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:410)
   at org.apache.beam.sdk.fn.channel.ManagedChannelFactory.forDescriptor(ManagedChannelFactory.java:44)
   at org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:108)
   at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:154)
   at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:137)
   at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
   at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
   at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
   at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
   at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
   at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
   at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
   at org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4992)
   at
   org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:162)
   ```
   
   ### [BEAM-6661] Avoid FileNotFoundException when no files have been staged 
   
   ### [BEAM-6661] Avoid printing empty lines in subprocess job server
   9a38a6d
   
   This would print an extra empty line on every logged line making the output
   verbose, e.g.
   
   Before:
   ```
   logline1
   
   logline2
   
   ```
   
   After:
   ```
   logline1
   logline2
   ```
   
   
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #11537: [BEAM-6661] Get rid of a few logging annoyances for execution and shutdown

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #11537:
URL: https://github.com/apache/beam/pull/11537#issuecomment-621321251


   > Btw, I'm seeing this one all the time now (both local and on Jenkins):
   
   Yeah, I also noticed yet another new one recently. I filed [BEAM-9853](https://issues.apache.org/jira/browse/BEAM-9853) to track this kind of bug, since it seems cleaning them up is going to take an ongoing effort.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #11537: [BEAM-6661] Get rid of a few logging annoyances for execution and shutdown

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #11537:
URL: https://github.com/apache/beam/pull/11537#issuecomment-620864491


   > @ibzib Maybe also worth backporting?
   
   Might as well. `SEVERE: *~*~*~ Beam is horribly broken! *~*~*~` definitely tends to frighten new users.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11537: [BEAM-6661]

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11537:
URL: https://github.com/apache/beam/pull/11537#issuecomment-620138249


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11537: [BEAM-6661] Get rid of a few logging annoyances for execution and shutdown

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11537:
URL: https://github.com/apache/beam/pull/11537#issuecomment-620841780


   Merging, we can follow-up with the nits if we feel like it, since they are very minor.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #11537: [BEAM-6661]

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #11537:
URL: https://github.com/apache/beam/pull/11537#discussion_r416024189



##########
File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java
##########
@@ -145,17 +149,23 @@ public InstructionRequestHandler getInstructionRequestHandler() {
 
       @Override
       public void close() throws Exception {
-        finalInstructionHandler.close();
-        BeamFnApi.StopWorkerRequest stopWorkerRequest =
-            BeamFnApi.StopWorkerRequest.newBuilder().setWorkerId(workerId).build();
-        LOG.debug("Closing worker ID {}", workerId);
-        BeamFnApi.StopWorkerResponse stopWorkerResponse =
-            BeamFnExternalWorkerPoolGrpc.newBlockingStub(
-                    ManagedChannelFactory.createDefault()
-                        .forDescriptor(externalPayload.getEndpoint()))
-                .stopWorker(stopWorkerRequest);
-        if (!stopWorkerResponse.getError().isEmpty()) {
-          throw new RuntimeException(stopWorkerResponse.getError());
+        try {
+          finalInstructionHandler.close();
+          BeamFnApi.StopWorkerRequest stopWorkerRequest =
+              BeamFnApi.StopWorkerRequest.newBuilder().setWorkerId(workerId).build();
+          LOG.debug("Closing worker ID {}", workerId);
+          BeamFnApi.StopWorkerResponse stopWorkerResponse =
+              BeamFnExternalWorkerPoolGrpc.newBlockingStub(managedChannel)
+                  .stopWorker(stopWorkerRequest);
+          if (!stopWorkerResponse.getError().isEmpty()) {
+            throw new RuntimeException(stopWorkerResponse.getError());
+          }
+        } finally {
+          managedChannel.shutdown();
+          managedChannel.awaitTermination(10, TimeUnit.SECONDS);

Review comment:
       Nit: awaitTermination already returns whether the channel is terminated, so no need to call isTerminated

##########
File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java
##########
@@ -145,17 +149,23 @@ public InstructionRequestHandler getInstructionRequestHandler() {
 
       @Override
       public void close() throws Exception {
-        finalInstructionHandler.close();
-        BeamFnApi.StopWorkerRequest stopWorkerRequest =
-            BeamFnApi.StopWorkerRequest.newBuilder().setWorkerId(workerId).build();
-        LOG.debug("Closing worker ID {}", workerId);
-        BeamFnApi.StopWorkerResponse stopWorkerResponse =
-            BeamFnExternalWorkerPoolGrpc.newBlockingStub(
-                    ManagedChannelFactory.createDefault()
-                        .forDescriptor(externalPayload.getEndpoint()))
-                .stopWorker(stopWorkerRequest);
-        if (!stopWorkerResponse.getError().isEmpty()) {
-          throw new RuntimeException(stopWorkerResponse.getError());
+        try {
+          finalInstructionHandler.close();
+          BeamFnApi.StopWorkerRequest stopWorkerRequest =
+              BeamFnApi.StopWorkerRequest.newBuilder().setWorkerId(workerId).build();
+          LOG.debug("Closing worker ID {}", workerId);
+          BeamFnApi.StopWorkerResponse stopWorkerResponse =
+              BeamFnExternalWorkerPoolGrpc.newBlockingStub(managedChannel)
+                  .stopWorker(stopWorkerRequest);
+          if (!stopWorkerResponse.getError().isEmpty()) {
+            throw new RuntimeException(stopWorkerResponse.getError());
+          }
+        } finally {
+          managedChannel.shutdown();
+          managedChannel.awaitTermination(10, TimeUnit.SECONDS);

Review comment:
       Why 10 seconds? Does shutdown usually take that long?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #11537: [BEAM-6661]

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11537:
URL: https://github.com/apache/beam/pull/11537#discussion_r416035621



##########
File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java
##########
@@ -145,17 +149,23 @@ public InstructionRequestHandler getInstructionRequestHandler() {
 
       @Override
       public void close() throws Exception {
-        finalInstructionHandler.close();
-        BeamFnApi.StopWorkerRequest stopWorkerRequest =
-            BeamFnApi.StopWorkerRequest.newBuilder().setWorkerId(workerId).build();
-        LOG.debug("Closing worker ID {}", workerId);
-        BeamFnApi.StopWorkerResponse stopWorkerResponse =
-            BeamFnExternalWorkerPoolGrpc.newBlockingStub(
-                    ManagedChannelFactory.createDefault()
-                        .forDescriptor(externalPayload.getEndpoint()))
-                .stopWorker(stopWorkerRequest);
-        if (!stopWorkerResponse.getError().isEmpty()) {
-          throw new RuntimeException(stopWorkerResponse.getError());
+        try {
+          finalInstructionHandler.close();
+          BeamFnApi.StopWorkerRequest stopWorkerRequest =
+              BeamFnApi.StopWorkerRequest.newBuilder().setWorkerId(workerId).build();
+          LOG.debug("Closing worker ID {}", workerId);
+          BeamFnApi.StopWorkerResponse stopWorkerResponse =
+              BeamFnExternalWorkerPoolGrpc.newBlockingStub(managedChannel)
+                  .stopWorker(stopWorkerRequest);
+          if (!stopWorkerResponse.getError().isEmpty()) {
+            throw new RuntimeException(stopWorkerResponse.getError());
+          }
+        } finally {
+          managedChannel.shutdown();
+          managedChannel.awaitTermination(10, TimeUnit.SECONDS);

Review comment:
       No, it usually takes ~1 second. 10 seconds is just the max waiting time which we have in a bunch of places.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11537: [BEAM-6661] Get rid of a few logging annoyances for execution and shutdown

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11537:
URL: https://github.com/apache/beam/pull/11537#issuecomment-620842028


   @ibzib Maybe also worth backporting?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org