You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/06/14 23:00:55 UTC
incubator-gobblin git commit: [GOBBLIN-512] Add containers to hold
unchecked exceptions from closing resources in broker
Repository: incubator-gobblin
Updated Branches:
refs/heads/master f7ea77eb9 -> a02073e9d
[GOBBLIN-512] Add containers to hold unchecked exceptions from closing resources in broker
Closes #2383 from
autumnust/brokerguaranteecloseeverything
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a02073e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a02073e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a02073e9
Branch: refs/heads/master
Commit: a02073e9d5cc91f3f759483c3dad8e6ddf011a65
Parents: f7ea77e
Author: Lei Sun <au...@gmail.com>
Authored: Thu Jun 14 16:00:50 2018 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Thu Jun 14 16:00:50 2018 -0700
----------------------------------------------------------------------
.../gobblin/broker/DefaultBrokerCache.java | 19 +++++++++++++++++--
1 file changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a02073e9/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
index 0c001f1..f034114 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
@@ -19,15 +19,19 @@ package org.apache.gobblin.broker;
import java.io.Closeable;
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
@@ -202,6 +206,7 @@ class DefaultBrokerCache<S extends ScopeType<S>> {
*/
public void close(ScopeWrapper<S> scope)
throws IOException {
+ List<Throwable> exceptionsList = Lists.newArrayList();
List<Service> awaitShutdown = Lists.newArrayList();
for (Map.Entry<RawJobBrokerKey, Object> entry : Maps.filterKeys(this.sharedResourceCache.asMap(),
@@ -211,8 +216,12 @@ class DefaultBrokerCache<S extends ScopeType<S>> {
if (entry.getValue() instanceof ResourceInstance) {
Object obj = ((ResourceInstance) entry.getValue()).getResource();
-
- SharedResourcesBrokerUtils.shutdownObject(obj, log);
+ // Catch unchecked exception while closing resources, make sure all resources managed by cache are closed.
+ try {
+ SharedResourcesBrokerUtils.shutdownObject(obj, log);
+ } catch (Throwable t) {
+ exceptionsList.add(t);
+ }
if (obj instanceof Service) {
awaitShutdown.add((Service) obj);
}
@@ -226,6 +235,12 @@ class DefaultBrokerCache<S extends ScopeType<S>> {
log.error("Failed to shutdown {}.", service);
}
}
+
+ // log exceptions while closing resources up.
+ if (exceptionsList.size() > 0) {
+ log.error(exceptionsList.stream()
+ .map(Throwables::getStackTraceAsString).collect(Collectors.joining("\n")));
+ }
}
/**