You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:43 UTC
[25/50] incubator-gobblin git commit: [GOBBLIN-405] Fix race
condition with access to immediately invalidated resources
[GOBBLIN-405] Fix race condition with access to immediately invalidated resources
Closes #2280 from htran1/broker_cache_race
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/bde5bb1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/bde5bb1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/bde5bb1f
Branch: refs/heads/0.12.0
Commit: bde5bb1f9d2eb310b6a16d52bad383eefaf0d75c
Parents: 19b2d81
Author: Hung Tran <hu...@linkedin.com>
Authored: Wed Feb 7 10:19:45 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Feb 7 10:19:45 2018 -0800
----------------------------------------------------------------------
.../apache/gobblin/broker/ResourceEntry.java | 17 ++++
.../publisher/DataPublisherFactoryTest.java | 48 +++++++++++
.../gobblin/broker/DefaultBrokerCache.java | 87 ++++++++++++++------
.../broker/ImmediatelyInvalidResourceEntry.java | 15 +++-
4 files changed, 141 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bde5bb1f/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java b/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java
index 6402391..cbdebe7 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java
@@ -44,4 +44,21 @@ public interface ResourceEntry<T> extends SharedResourceFactoryResponse<T> {
* key, blocking all requests for that key. As suck, this method should be reasonably fast.
*/
void onInvalidate();
+
+ /**
+ * This method should guarantee that if all callers accessing the resource using this method then the object is
+ * returned atomically with respect to any validity state change.
+ *
+ * This is to avoid race conditions in cases where the state is changed when getting the resource. Some examples are
+ * resources that can only be used a certain number of times.
+ *
+ * @return null if the object is not valid, otherwise the valid object
+ */
+ default T getResourceIfValid() {
+ if (isValid()) {
+ return getResource();
+ } else {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bde5bb1f/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
index b2cd739..6f58a50 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
@@ -17,9 +17,16 @@
package org.apache.gobblin.publisher;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -94,6 +101,47 @@ public class DataPublisherFactoryTest {
Assert.assertTrue(publisher1.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP));
}
+ @Test()
+ public void testMultiThreadedGetNonThreadSafePublisher()
+ throws InterruptedException, ExecutionException, IOException {
+ SharedResourcesBroker broker =
+ SharedResourcesBrokerFactory.<SimpleScopeType>createDefaultTopLevelBroker(ConfigFactory.empty(),
+ SimpleScopeType.GLOBAL.defaultScopeInstance());
+
+ ExecutorService service = Executors.newFixedThreadPool(40);
+ List<Future<?>> futures = new ArrayList<>();
+
+ for (int i = 0; i < 100000; i++) {
+ futures.add(service.submit(new GetNonThreadSafePublisher(broker)));
+ }
+
+ for (Future f: futures) {
+ f.get();
+ }
+ service.shutdown();
+ service.awaitTermination(100, TimeUnit.SECONDS);
+ }
+
+ private static class GetNonThreadSafePublisher implements Runnable {
+ private final SharedResourcesBroker broker;
+ private static long count = 0;
+
+ GetNonThreadSafePublisher(SharedResourcesBroker broker) {
+ this.broker = broker;
+ }
+
+ @Override
+ public void run() {
+ try {
+ DataPublisher publisher1 = DataPublisherFactory.get(TestNonThreadsafeDataPublisher.class.getName(), null, this.broker);
+ Assert.assertNotNull(publisher1);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+
private static class TestNonThreadsafeDataPublisher extends DataPublisher {
public TestNonThreadsafeDataPublisher(State state) {
super(state);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bde5bb1f/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
index 6425bab..0c001f1 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.Striped;
import org.apache.gobblin.broker.iface.ScopeType;
import org.apache.gobblin.broker.iface.SharedResourceFactory;
+import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
import org.apache.gobblin.broker.iface.SharedResourceKey;
import org.apache.gobblin.broker.iface.NoSuchScopeException;
@@ -98,42 +99,78 @@ class DefaultBrokerCache<S extends ScopeType<S>> {
}
/**
- * Get an object for the specified factory, key, scope, and broker. {@link DefaultBrokerCache}
- * guarantees that calling this method for the same factory, key, and scope will return the same object.
+ * Get a scoped object from the cache.
*/
@SuppressWarnings(value = "unchecked")
- <T, K extends SharedResourceKey> T getScoped(final SharedResourceFactory<T, K, S> factory, @Nonnull final K key,
+ <T, K extends SharedResourceKey> SharedResourceFactoryResponse<T> getScopedFromCache(
+ final SharedResourceFactory<T, K, S> factory, @Nonnull final K key,
@Nonnull final ScopeWrapper<S> scope, final SharedResourcesBrokerImpl<S> broker)
throws ExecutionException {
-
RawJobBrokerKey fullKey = new RawJobBrokerKey(scope, factory.getName(), key);
Object obj = this.sharedResourceCache.get(fullKey, new Callable<Object>() {
@Override
public Object call() throws Exception {
- return factory.createResource(broker.getScopedView(scope.getType()), broker.getConfigView(scope.getType(), key, factory.getName()));
+ return factory.createResource(broker.getScopedView(scope.getType()), broker.getConfigView(scope.getType(), key,
+ factory.getName()));
}
});
- if (obj instanceof ResourceCoordinate) {
- ResourceCoordinate<T, K, S> resourceCoordinate = (ResourceCoordinate<T, K, S>) obj;
- if (!SharedResourcesBrokerUtils.isScopeTypeAncestor((ScopeType) scope.getType(), ((ResourceCoordinate) obj).getScope())) {
- throw new RuntimeException(String.format("%s returned an invalid coordinate: scope %s is not an ancestor of %s.",
- factory.getName(), ((ResourceCoordinate) obj).getScope(), scope.getType()));
- }
- try {
- return getScoped(resourceCoordinate.getFactory(), resourceCoordinate.getKey(),
- broker.getWrappedScope(resourceCoordinate.getScope()), broker);
- } catch (NoSuchScopeException nsse) {
- throw new RuntimeException(String.format("%s returned an invalid coordinate: scope %s is not available.",
- factory.getName(), resourceCoordinate.getScope().name()), nsse);
- }
- } else if (obj instanceof ResourceEntry) {
- if (!((ResourceEntry) obj).isValid()) {
- safeInvalidate(fullKey);
- return getScoped(factory, key, scope, broker);
+
+ return (SharedResourceFactoryResponse<T>)obj;
+ }
+
+ /**
+ * Get an object for the specified factory, key, scope, and broker. {@link DefaultBrokerCache}
+ * guarantees that calling this method for the same factory, key, and scope will return the same object.
+ */
+ @SuppressWarnings(value = "unchecked")
+ <T, K extends SharedResourceKey> T getScoped(final SharedResourceFactory<T, K, S> factory, @Nonnull final K key,
+ @Nonnull final ScopeWrapper<S> scope, final SharedResourcesBrokerImpl<S> broker)
+ throws ExecutionException {
+ SharedResourceFactory<T, K, S> currentFactory = factory;
+ K currentKey = key;
+ ScopeWrapper<S> currentScope = scope;
+
+ Object obj = getScopedFromCache(currentFactory, currentKey, currentScope, broker);
+
+ // this loop is to continue looking up objects through redirection or reloading until a valid resource is found
+ while (true) {
+ if (obj instanceof ResourceCoordinate) {
+ ResourceCoordinate<T, K, S> resourceCoordinate = (ResourceCoordinate<T, K, S>) obj;
+ if (!SharedResourcesBrokerUtils.isScopeTypeAncestor((ScopeType) currentScope.getType(), ((ResourceCoordinate) obj).getScope())) {
+ throw new RuntimeException(String
+ .format("%s returned an invalid coordinate: scope %s is not an ancestor of %s.", currentFactory.getName(),
+ ((ResourceCoordinate) obj).getScope(), currentScope.getType()));
+ }
+ try {
+ obj = getScopedFromCache(resourceCoordinate.getFactory(), resourceCoordinate.getKey(),
+ broker.getWrappedScope(resourceCoordinate.getScope()), broker);
+ } catch (NoSuchScopeException nsse) {
+ throw new RuntimeException(String
+ .format("%s returned an invalid coordinate: scope %s is not available.", factory.getName(),
+ resourceCoordinate.getScope().name()), nsse);
+ }
+ } else if (obj instanceof ResourceEntry) {
+ T resource = ((ResourceEntry<T>) obj).getResourceIfValid();
+
+ // valid resource found
+ if (resource != null) {
+ return resource;
+ }
+
+ // resource is invalid. The lock in this block is to reduce the chance of starvation where a thread keeps
+ // getting objects that are invalidated by another thread.
+ Lock lock = this.invalidationLock.get(key);
+ try {
+ lock.lock();
+ RawJobBrokerKey fullKey = new RawJobBrokerKey(currentScope, currentFactory.getName(), currentKey);
+ safeInvalidate(fullKey);
+ obj = getScopedFromCache(currentFactory, currentKey, currentScope, broker);
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ throw new RuntimeException(String.format("Invalid response from %s: %s.", factory.getName(), obj.getClass()));
}
- return ((ResourceEntry<T>) obj).getResource();
- } else {
- throw new RuntimeException(String.format("Invalid response from %s: %s.", factory.getName(), obj.getClass()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bde5bb1f/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java
index b3f8502..ccb569c 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java
@@ -36,7 +36,7 @@ public class ImmediatelyInvalidResourceEntry<T> extends ResourceInstance<T> {
}
@Override
- public T getResource() {
+ public synchronized T getResource() {
// mark the object as invalid before returning so that a new one will be created on the next
// request from the factory
this.valid = false;
@@ -53,4 +53,17 @@ public class ImmediatelyInvalidResourceEntry<T> extends ResourceInstance<T> {
public void onInvalidate() {
// these type of resource cannot be closed on invalidation since the lifetime can't be determined
}
+
+ /**
+ * This method is synchronized so that the validity check and validity change is atomic for callers of this method.
+ * @return
+ */
+ @Override
+ public synchronized T getResourceIfValid() {
+ if (this.valid) {
+ return getResource();
+ } else {
+ return null;
+ }
+ }
}