You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/22 21:14:58 UTC

incubator-gobblin git commit: [GOBBLIN-435] Fix data publisher created from job broker not closed

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master f51bf00b4 -> d323f6022


[GOBBLIN-435] Fix data publisher created from job broker not closed

Closes #2312 from zxcware/audit


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d323f602
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d323f602
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d323f602

Branch: refs/heads/master
Commit: d323f60222534ddff42beaf08716040246a999c3
Parents: f51bf00
Author: zhchen <zh...@linkedin.com>
Authored: Thu Mar 22 14:14:53 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Mar 22 14:14:53 2018 -0700

----------------------------------------------------------------------
 .../gobblin/publisher/DataPublisherFactory.java |  3 +-
 .../publisher/DataPublisherFactoryTest.java     | 39 +++++++++++++++-----
 .../broker/SharedResourcesBrokerImpl.java       |  4 ++
 3 files changed, 35 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d323f602/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
index 8d77fd6..ea228a7 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/DataPublisherFactory.java
@@ -78,6 +78,7 @@ public class DataPublisherFactory<S extends ScopeType<S>>
       State state = key.getState();
       Class<? extends DataPublisher> dataPublisherClass =  (Class<? extends DataPublisher>) Class
           .forName(publisherClassName);
+      log.info("Creating data publisher with class {} in scope {}. ", publisherClassName, config.getScope().toString());
 
       DataPublisher publisher = DataPublisher.getInstance(dataPublisherClass, state);
 
@@ -97,6 +98,6 @@ public class DataPublisherFactory<S extends ScopeType<S>>
 
   @Override
   public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, DataPublisherKey> config) {
-    return broker.selfScope().getType().rootScope();
+    return broker.selfScope().getType();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d323f602/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
index 6f58a50..5c65fc4 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
@@ -43,6 +43,9 @@ import org.apache.gobblin.capability.Capability;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 
+import lombok.Getter;
+
+
 /**
  * Tests for DataPublisherFactory
  */
@@ -76,29 +79,41 @@ public class DataPublisherFactoryTest {
     SharedResourcesBroker<SimpleScopeType> localBroker1 =
         broker.newSubscopedBuilder(new SimpleScope<>(SimpleScopeType.LOCAL, "local1")).build();
 
-    DataPublisher publisher1 = DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(), null, broker);
-    DataPublisher publisher2 = DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(), null, broker);
+    TestThreadsafeDataPublisher publisher1 = (TestThreadsafeDataPublisher)DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(), null, broker);
+    TestThreadsafeDataPublisher publisher2 = (TestThreadsafeDataPublisher)DataPublisherFactory.get(TestThreadsafeDataPublisher.class.getName(), null, broker);
 
     // should get the same publisher
     Assert.assertEquals(publisher1, publisher2);
 
-    DataPublisher publisher3 =
-        localBroker1.getSharedResource(new DataPublisherFactory<>(),
+    TestThreadsafeDataPublisher publisher3 =
+        (TestThreadsafeDataPublisher)localBroker1.getSharedResource(new DataPublisherFactory<>(),
             new DataPublisherKey(TestThreadsafeDataPublisher.class.getName(), null));
 
-    // should get the same publisher
-    Assert.assertEquals(publisher2, publisher3);
+    // should not get the same publisher
+    Assert.assertNotEquals(publisher2, publisher3);
 
-    DataPublisher publisher4 =
-        localBroker1.getSharedResourceAtScope(new DataPublisherFactory<>(),
+    TestThreadsafeDataPublisher publisher4 =
+        (TestThreadsafeDataPublisher)localBroker1.getSharedResourceAtScope(new DataPublisherFactory<>(),
             new DataPublisherKey(TestThreadsafeDataPublisher.class.getName(), null), SimpleScopeType.LOCAL);
 
-    // should get a different publisher
-    Assert.assertNotEquals(publisher3, publisher4);
+    // should get the same publisher
+    Assert.assertEquals(publisher3, publisher4);
 
     // Check capabilities
     Assert.assertTrue(publisher1.supportsCapability(DataPublisher.REUSABLE, Collections.EMPTY_MAP));
     Assert.assertTrue(publisher1.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP));
+
+    // Check data publisher is not closed
+    Assert.assertFalse(publisher1.isClosed());
+    Assert.assertFalse(publisher2.isClosed());
+    Assert.assertFalse(publisher3.isClosed());
+    Assert.assertFalse(publisher4.isClosed());
+    broker.close();
+    // Check all publishers are closed
+    Assert.assertTrue(publisher1.isClosed());
+    Assert.assertTrue(publisher2.isClosed());
+    Assert.assertTrue(publisher3.isClosed());
+    Assert.assertTrue(publisher4.isClosed());
   }
 
   @Test()
@@ -143,6 +158,9 @@ public class DataPublisherFactoryTest {
 
 
   private static class TestNonThreadsafeDataPublisher extends DataPublisher {
+    @Getter
+    private boolean isClosed = false;
+
     public TestNonThreadsafeDataPublisher(State state) {
       super(state);
     }
@@ -161,6 +179,7 @@ public class DataPublisherFactoryTest {
 
     @Override
     public void close() throws IOException {
+      isClosed = true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d323f602/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
index c5031fe..ff066f4 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerImpl.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.util.ConfigUtils;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
 
 
 /**
@@ -55,6 +56,7 @@ import lombok.Data;
  *  SharedResourcesBrokerImpl<MyScopes> scopeBroker = topBroker.newSubscopedBuilder(scope, "scopeId").build();
  * </pre>
  */
+@Slf4j
 public class SharedResourcesBrokerImpl<S extends ScopeType<S>> implements SharedResourcesBroker<S> {
 
   private final DefaultBrokerCache<S> brokerCache;
@@ -323,6 +325,8 @@ public class SharedResourcesBrokerImpl<S extends ScopeType<S>> implements Shared
   @Override
   public void close()
       throws IOException {
+    ScopeInstance<S> scope = this.selfScopeWrapper.getScope();
+    log.info("Closing broker with scope {} of id {}.", scope.getType().toString(), scope.getScopeId());
     this.brokerCache.close(this.selfScopeWrapper);
   }
 }