You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:43 UTC

[25/50] incubator-gobblin git commit: [GOBBLIN-405] Fix race condition with access to immediately invalidated resources

[GOBBLIN-405] Fix race condition with access to immediately invalidated resources

Closes #2280 from htran1/broker_cache_race


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/bde5bb1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/bde5bb1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/bde5bb1f

Branch: refs/heads/0.12.0
Commit: bde5bb1f9d2eb310b6a16d52bad383eefaf0d75c
Parents: 19b2d81
Author: Hung Tran <hu...@linkedin.com>
Authored: Wed Feb 7 10:19:45 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Feb 7 10:19:45 2018 -0800

----------------------------------------------------------------------
 .../apache/gobblin/broker/ResourceEntry.java    | 17 ++++
 .../publisher/DataPublisherFactoryTest.java     | 48 +++++++++++
 .../gobblin/broker/DefaultBrokerCache.java      | 87 ++++++++++++++------
 .../broker/ImmediatelyInvalidResourceEntry.java | 15 +++-
 4 files changed, 141 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bde5bb1f/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java b/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java
index 6402391..cbdebe7 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/broker/ResourceEntry.java
@@ -44,4 +44,21 @@ public interface ResourceEntry<T> extends SharedResourceFactoryResponse<T> {
    * key, blocking all requests for that key. As suck, this method should be reasonably fast.
    */
   void onInvalidate();
+
+  /**
+   * This method should guarantee that if all callers accessing the resource using this method then the object is
+   * returned atomically with respect to any validity state change.
+   *
+   * This is to avoid race conditions in cases where the state is changed when getting the resource. Some examples are
+   * resources that can only be used a certain number of times.
+   *
+   * @return null if the object is not valid, otherwise the valid object
+   */
+  default T getResourceIfValid() {
+    if (isValid()) {
+      return getResource();
+    } else {
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bde5bb1f/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
index b2cd739..6f58a50 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/DataPublisherFactoryTest.java
@@ -17,9 +17,16 @@
 package org.apache.gobblin.publisher;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -94,6 +101,47 @@ public class DataPublisherFactoryTest {
     Assert.assertTrue(publisher1.supportsCapability(Capability.THREADSAFE, Collections.EMPTY_MAP));
   }
 
+  @Test()
+  public void testMultiThreadedGetNonThreadSafePublisher()
+      throws InterruptedException, ExecutionException, IOException {
+    SharedResourcesBroker broker =
+        SharedResourcesBrokerFactory.<SimpleScopeType>createDefaultTopLevelBroker(ConfigFactory.empty(),
+            SimpleScopeType.GLOBAL.defaultScopeInstance());
+
+    ExecutorService service = Executors.newFixedThreadPool(40);
+    List<Future<?>> futures = new ArrayList<>();
+
+    for (int i = 0; i < 100000; i++) {
+      futures.add(service.submit(new GetNonThreadSafePublisher(broker)));
+    }
+
+    for (Future f: futures) {
+      f.get();
+    }
+    service.shutdown();
+    service.awaitTermination(100, TimeUnit.SECONDS);
+  }
+
+  private static class GetNonThreadSafePublisher implements Runnable {
+    private final SharedResourcesBroker broker;
+    private static long count = 0;
+
+    GetNonThreadSafePublisher(SharedResourcesBroker broker) {
+      this.broker = broker;
+    }
+
+    @Override
+    public void run() {
+      try {
+        DataPublisher publisher1 = DataPublisherFactory.get(TestNonThreadsafeDataPublisher.class.getName(), null, this.broker);
+        Assert.assertNotNull(publisher1);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+
   private static class TestNonThreadsafeDataPublisher extends DataPublisher {
     public TestNonThreadsafeDataPublisher(State state) {
       super(state);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bde5bb1f/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
index 6425bab..0c001f1 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/DefaultBrokerCache.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.Striped;
 
 import org.apache.gobblin.broker.iface.ScopeType;
 import org.apache.gobblin.broker.iface.SharedResourceFactory;
+import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
 import org.apache.gobblin.broker.iface.SharedResourceKey;
 import org.apache.gobblin.broker.iface.NoSuchScopeException;
 
@@ -98,42 +99,78 @@ class DefaultBrokerCache<S extends ScopeType<S>> {
   }
 
   /**
-   * Get an object for the specified factory, key, scope, and broker. {@link DefaultBrokerCache}
-   * guarantees that calling this method for the same factory, key, and scope will return the same object.
+   * Get a scoped object from the cache.
    */
   @SuppressWarnings(value = "unchecked")
-  <T, K extends SharedResourceKey> T getScoped(final SharedResourceFactory<T, K, S> factory, @Nonnull final K key,
+  <T, K extends SharedResourceKey> SharedResourceFactoryResponse<T> getScopedFromCache(
+      final SharedResourceFactory<T, K, S> factory, @Nonnull final K key,
       @Nonnull final ScopeWrapper<S> scope, final SharedResourcesBrokerImpl<S> broker)
       throws ExecutionException {
-
     RawJobBrokerKey fullKey = new RawJobBrokerKey(scope, factory.getName(), key);
     Object obj = this.sharedResourceCache.get(fullKey, new Callable<Object>() {
       @Override
       public Object call() throws Exception {
-        return factory.createResource(broker.getScopedView(scope.getType()), broker.getConfigView(scope.getType(), key, factory.getName()));
+        return factory.createResource(broker.getScopedView(scope.getType()), broker.getConfigView(scope.getType(), key,
+            factory.getName()));
       }
     });
-    if (obj instanceof ResourceCoordinate) {
-      ResourceCoordinate<T, K, S> resourceCoordinate = (ResourceCoordinate<T, K, S>) obj;
-      if (!SharedResourcesBrokerUtils.isScopeTypeAncestor((ScopeType) scope.getType(), ((ResourceCoordinate) obj).getScope())) {
-        throw new RuntimeException(String.format("%s returned an invalid coordinate: scope %s is not an ancestor of %s.",
-            factory.getName(), ((ResourceCoordinate) obj).getScope(), scope.getType()));
-      }
-      try {
-        return getScoped(resourceCoordinate.getFactory(), resourceCoordinate.getKey(),
-            broker.getWrappedScope(resourceCoordinate.getScope()), broker);
-      } catch (NoSuchScopeException nsse) {
-        throw new RuntimeException(String.format("%s returned an invalid coordinate: scope %s is not available.",
-            factory.getName(), resourceCoordinate.getScope().name()), nsse);
-      }
-    } else if (obj instanceof ResourceEntry) {
-      if (!((ResourceEntry) obj).isValid()) {
-        safeInvalidate(fullKey);
-        return getScoped(factory, key, scope, broker);
+
+    return (SharedResourceFactoryResponse<T>)obj;
+  }
+
+  /**
+   * Get an object for the specified factory, key, scope, and broker. {@link DefaultBrokerCache}
+   * guarantees that calling this method for the same factory, key, and scope will return the same object.
+   */
+  @SuppressWarnings(value = "unchecked")
+  <T, K extends SharedResourceKey> T getScoped(final SharedResourceFactory<T, K, S> factory, @Nonnull final K key,
+      @Nonnull final ScopeWrapper<S> scope, final SharedResourcesBrokerImpl<S> broker)
+      throws ExecutionException {
+    SharedResourceFactory<T, K, S> currentFactory = factory;
+    K currentKey = key;
+    ScopeWrapper<S> currentScope = scope;
+
+    Object obj = getScopedFromCache(currentFactory, currentKey, currentScope, broker);
+
+    // this loop is to continue looking up objects through redirection or reloading until a valid resource is found
+    while (true) {
+      if (obj instanceof ResourceCoordinate) {
+        ResourceCoordinate<T, K, S> resourceCoordinate = (ResourceCoordinate<T, K, S>) obj;
+        if (!SharedResourcesBrokerUtils.isScopeTypeAncestor((ScopeType) currentScope.getType(), ((ResourceCoordinate) obj).getScope())) {
+          throw new RuntimeException(String
+              .format("%s returned an invalid coordinate: scope %s is not an ancestor of %s.", currentFactory.getName(),
+                  ((ResourceCoordinate) obj).getScope(), currentScope.getType()));
+        }
+        try {
+          obj = getScopedFromCache(resourceCoordinate.getFactory(), resourceCoordinate.getKey(),
+              broker.getWrappedScope(resourceCoordinate.getScope()), broker);
+        } catch (NoSuchScopeException nsse) {
+          throw new RuntimeException(String
+              .format("%s returned an invalid coordinate: scope %s is not available.", factory.getName(),
+                  resourceCoordinate.getScope().name()), nsse);
+        }
+      } else if (obj instanceof ResourceEntry) {
+        T resource = ((ResourceEntry<T>) obj).getResourceIfValid();
+
+        // valid resource found
+        if (resource != null) {
+          return resource;
+        }
+
+        // resource is invalid. The lock in this block is to reduce the chance of starvation where a thread keeps
+        // getting objects that are invalidated by another thread.
+        Lock lock = this.invalidationLock.get(key);
+        try {
+          lock.lock();
+          RawJobBrokerKey fullKey = new RawJobBrokerKey(currentScope, currentFactory.getName(), currentKey);
+          safeInvalidate(fullKey);
+          obj = getScopedFromCache(currentFactory, currentKey, currentScope, broker);
+        } finally {
+          lock.unlock();
+        }
+      } else {
+        throw new RuntimeException(String.format("Invalid response from %s: %s.", factory.getName(), obj.getClass()));
       }
-      return ((ResourceEntry<T>) obj).getResource();
-    } else {
-      throw new RuntimeException(String.format("Invalid response from %s: %s.", factory.getName(), obj.getClass()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bde5bb1f/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java
index b3f8502..ccb569c 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/ImmediatelyInvalidResourceEntry.java
@@ -36,7 +36,7 @@ public class ImmediatelyInvalidResourceEntry<T> extends ResourceInstance<T> {
   }
 
   @Override
-  public T getResource() {
+  public synchronized T getResource() {
     // mark the object as invalid before returning so that a new one will be created on the next
     // request from the factory
     this.valid = false;
@@ -53,4 +53,17 @@ public class ImmediatelyInvalidResourceEntry<T> extends ResourceInstance<T> {
   public void onInvalidate() {
     // these type of resource cannot be closed on invalidation since the lifetime can't be determined
   }
+
+  /**
+   * This method is synchronized so that the validity check and validity change is atomic for callers of this method.
+   * @return
+   */
+  @Override
+  public synchronized T getResourceIfValid() {
+    if (this.valid) {
+      return getResource();
+    } else {
+      return null;
+    }
+  }
 }