You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/10/13 21:17:00 UTC

[gobblin] branch master updated: [GOBBLIN-1556]Add shutdown logic in FsJobConfigurationManager (#3407)

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a1b0b9e  [GOBBLIN-1556]Add shutdown logic in FsJobConfigurationManager (#3407)
a1b0b9e is described below

commit a1b0b9e2eda43e0f518a35ac211844ac73b4cdd6
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Wed Oct 13 14:16:51 2021 -0700

    [GOBBLIN-1556]Add shutdown logic in FsJobConfigurationManager (#3407)
    
    * [hotfix] workaround to catch exception when iceberg does not support get metrics for non-union type
    
    * address comments
    
    * [GOBBLIN-1556]Add shutdown logic in FsJobConfigurationManager
---
 .../java/org/apache/gobblin/cluster/FsJobConfigurationManager.java  | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
index 2563201..d1f5b11 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
@@ -88,6 +88,12 @@ public class FsJobConfigurationManager extends JobConfigurationManager {
     }, 0, this.refreshIntervalInSeconds, TimeUnit.SECONDS);
   }
 
+  @Override
+  protected void shutDown() throws Exception {
+    ExecutorsUtils.shutdownExecutorService(this.fetchJobSpecExecutor, Optional.of(log));
+    super.shutDown();
+  }
+
   void fetchJobSpecs() throws ExecutionException, InterruptedException {
     List<Pair<SpecExecutor.Verb, JobSpec>> jobSpecs =
         (List<Pair<SpecExecutor.Verb, JobSpec>>) this.specConsumer.changedSpecs().get();