You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/22 21:14:58 UTC
incubator-gobblin git commit: [GOBBLIN-435] Fix data publisher
created from job broker not closed
Repository: incubator-gobblin
Updated Branches:
refs/heads/master f51bf00b4 -> d323f6022
[GOBBLIN-435] Fix data publisher created from job broker not closed
Closes #2312 from zxcware/audit
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d323f602
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d323f602
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d323f602
Branch: refs/heads/master
Commit: d323f60222534ddff42beaf08716040246a999c3
Parents: f51bf00
Author: zhchen <zh...@linkedin.com>
Authored: Thu Mar 22 14:14:53 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Mar 22 14:14:53 2018 -0700
----------------------------------------------------------------------
.../gobblin/publisher/DataPublisherFactory.java | 3 +-
.../publisher/DataPublisherFactoryTest.java | 39 +++++++++++++++-----
.../broker/SharedResourcesBrokerImpl.java | 4 ++
3 files changed, 35 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d323f602/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
index 8d77fd6..ea228a7 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
@@ -78,6 +78,7 @@ public class DataPublisherFactory<S extends ScopeType<S>>
State state = key.getState();
Class<? extends DataPublisher> dataPublisherClass = (Class<? extends DataPublisher>) Class
.forName(publisherClassName);
+ log.info("Creating data publisher with class {} in scope {}. ", publisherClassName, config.getScope().toString());
DataPublisher publisher = DataPublisher.getInstance(dataPublisherClass, state);
@@ -97,6 +98,6 @@ public class DataPublisherFactory<S extends ScopeType<S>>
@Override
public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, DataPublisherKey> config) {
- return broker.selfScope().getType().rootScope();
+ return broker.selfScope().getType();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d323f602/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
index 6f58a50..5c65fc4 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
@@ -43,6 +43,9 @@ import org.apache.gobblin.capability.Capability;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import lombok.Getter;
+
+
/**
* Tests for DataPublisherFactory
*/
@@ -76,29 +79,41 @@ public class DataPublisherFactoryTest {
SharedResourcesBroker<SimpleScopeType> localBroker1 =
broker.newSubscopedBuilder(new SimpleScope<>(SimpleScopeType.LOCAL, "local1")).build();
- DataPublisher publisher1 = DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(), null, broker);
- DataPublisher publisher2 = DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(), null, broker);
+ TestThreadsafeDataPublisher publisher1 = (TestThreadsafeDataPublisher)DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(), null, broker);
+ TestThreadsafeDataPublisher publisher2 = (TestThreadsafeDataPublisher)DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(), null, broker);
// should get the same publisher
Assert.assertEquals(publisher1, publisher2);
- DataPublisher publisher3 =
- localBroker1.getSharedResource(new DataPublisherFactory<>(),
+ TestThreadsafeDataPublisher publisher3 =
+ (TestThreadsafeDataPublisher)localBroker1.getSharedResource(new DataPublisherFactory<>(),
new DataPublisherKey(TestThreadsafeDataPublisher.class.getName(), null));
- // should get the same publisher
- Assert.assertEquals(publisher2, publisher3);
+ // should not get the same publisher
+ Assert.assertNotEquals(publisher2, publisher3);
- DataPublisher publisher4 =
- localBroker1.getSharedResourceAtScope(new DataPublisherFactory<>(),
+ TestThreadsafeDataPublisher publisher4 =
+ (TestThreadsafeDataPublisher)localBroker1.getSharedResourceAtScope(new DataPublisherFactory<>(),
new DataPublisherKey(TestThreadsafeDataPublisher.class.getName(), null), SimpleScopeType.LOCAL);
- // should get a different publisher
- Assert.assertNotEquals(publisher3, publisher4);
+ // should get the same publisher
+ Assert.assertEquals(publisher3, publisher4);
// Check capabilities
Assert.assertTrue(publisher1.supportsCapability(DataPublisher.REUSABLE, Collections.EMPTY_MAP));
Assert.assertTrue(publisher1.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP));
+
+ // Check data publisher is not closed
+ Assert.assertFalse(publisher1.isClosed());
+ Assert.assertFalse(publisher2.isClosed());
+ Assert.assertFalse(publisher3.isClosed());
+ Assert.assertFalse(publisher4.isClosed());
+ broker.close();
+ // Check all publishers are closed
+ Assert.assertTrue(publisher1.isClosed());
+ Assert.assertTrue(publisher2.isClosed());
+ Assert.assertTrue(publisher3.isClosed());
+ Assert.assertTrue(publisher4.isClosed());
}
@Test()
@@ -143,6 +158,9 @@ public class DataPublisherFactoryTest {
private static class TestNonThreadsafeDataPublisher extends DataPublisher {
+ @Getter
+ private boolean isClosed = false;
+
public TestNonThreadsafeDataPublisher(State state) {
super(state);
}
@@ -161,6 +179,7 @@ public class DataPublisherFactoryTest {
@Override
public void close() throws IOException {
+ isClosed = true;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d323f602/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
index c5031fe..ff066f4 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.util.ConfigUtils;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
/**
@@ -55,6 +56,7 @@ import lombok.Data;
* SharedResourcesBrokerImpl<MyScopes> scopeBroker = topBroker.newSubscopedBuilder(scope, "scopeId").build();
* </pre>
*/
+@Slf4j
public class SharedResourcesBrokerImpl<S extends ScopeType<S>> implements SharedResourcesBroker<S> {
private final DefaultBrokerCache<S> brokerCache;
@@ -323,6 +325,8 @@ public class SharedResourcesBrokerImpl<S extends ScopeType<S>> implements Shared
@Override
public void close()
throws IOException {
+ ScopeInstance<S> scope = this.selfScopeWrapper.getScope();
+ log.info("Closing broker with scope {} of id {}.", scope.getType().toString(), scope.getScopeId());
this.brokerCache.close(this.selfScopeWrapper);
}
}