You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/06/14 23:00:55 UTC

incubator-gobblin git commit: [GOBBLIN-512] Add containers to hold unchecked exceptions from closing resources in broker

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master f7ea77eb9 -> a02073e9d


[GOBBLIN-512] Add containers to hold unchecked exceptions from closing resources in broker

Closes #2383 from
autumnust/brokerguaranteecloseeverything


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a02073e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a02073e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a02073e9

Branch: refs/heads/master
Commit: a02073e9d5cc91f3f759483c3dad8e6ddf011a65
Parents: f7ea77e
Author: Lei Sun <au...@gmail.com>
Authored: Thu Jun 14 16:00:50 2018 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Thu Jun 14 16:00:50 2018 -0700

----------------------------------------------------------------------
 .../gobblin/broker/DefaultBrokerCache.java       | 19 +++++++++++++++++--
 1 file changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a02073e9/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
index 0c001f1..f034114 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
@@ -19,15 +19,19 @@ package org.apache.gobblin.broker;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
@@ -202,6 +206,7 @@ class DefaultBrokerCache<S extends ScopeType<S>> {
    */
   public void close(ScopeWrapper<S> scope)
       throws IOException {
+    List<Throwable> exceptionsList = Lists.newArrayList();
     List<Service> awaitShutdown = Lists.newArrayList();
 
     for (Map.Entry<RawJobBrokerKey, Object> entry : Maps.filterKeys(this.sharedResourceCache.asMap(),
@@ -211,8 +216,12 @@ class DefaultBrokerCache<S extends ScopeType<S>> {
 
       if (entry.getValue() instanceof ResourceInstance) {
         Object obj = ((ResourceInstance) entry.getValue()).getResource();
-
-        SharedResourcesBrokerUtils.shutdownObject(obj, log);
+        // Catch unchecked exception while closing resources, make sure all resources managed by cache are closed.
+        try {
+          SharedResourcesBrokerUtils.shutdownObject(obj, log);
+        } catch (Throwable t) {
+          exceptionsList.add(t);
+        }
         if (obj instanceof Service) {
           awaitShutdown.add((Service) obj);
         }
@@ -226,6 +235,12 @@ class DefaultBrokerCache<S extends ScopeType<S>> {
         log.error("Failed to shutdown {}.", service);
       }
     }
+
+    // log exceptions while closing resources up.
+    if (exceptionsList.size() > 0) {
+      log.error(exceptionsList.stream()
+          .map(Throwables::getStackTraceAsString).collect(Collectors.joining("\n")));
+    }
   }
 
   /**