You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2019/06/01 00:22:00 UTC

[beam] branch master updated: [BEAM-7412] shut down executor service in fn harness

This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 107849c  [BEAM-7412] shut down executor service in fn harness
     new 0b58c88  Merge pull request #8722 from ibzib/gcs-options
107849c is described below

commit 107849c729f2ec1f9d23553062db9c87f8014f14
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Wed May 29 20:27:28 2019 -0700

    [BEAM-7412] shut down executor service in fn harness
---
 .../harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 751f9b2..6e8329d 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -19,6 +19,7 @@ package org.apache.beam.fn.harness;
 
 import java.util.EnumMap;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import org.apache.beam.fn.harness.control.AddHarnessIdInterceptor;
 import org.apache.beam.fn.harness.control.BeamFnControlClient;
@@ -166,6 +167,7 @@ public class FnHarness {
       OutboundObserverFactory outboundObserverFactory)
       throws Exception {
     IdGenerator idGenerator = IdGenerators.decrementingLongs();
+    ExecutorService executorService = options.as(GcsOptions.class).getExecutorService();
     // The logging client variable is not used per se, but during its lifetime (until close()) it
     // intercepts logging and sends it to the logging service.
     try (BeamFnLoggingClient logging =
@@ -203,9 +205,10 @@ public class FnHarness {
       JvmInitializers.runBeforeProcessing(options);
 
       LOG.info("Entering instruction processing loop");
-      control.processInstructionRequests(options.as(GcsOptions.class).getExecutorService());
+      control.processInstructionRequests(executorService);
     } finally {
       System.out.println("Shutting SDK harness down.");
+      executorService.shutdown();
     }
   }
 }