You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/26 16:44:11 UTC

[33/50] [abbrv] incubator-beam git commit: Improve teardown behavior in DoFnLifecycleManager

Improve teardown behavior in DoFnLifecycleManager

Use Cache invalidation hooks to teardown DoFns that are no longer in the
cache. Ensure that remove() and removeAll() report thrown exceptions
even though the exceptions are not thrown by the LoadingCache.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7239ebb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7239ebb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7239ebb0

Branch: refs/heads/gearpump-runner
Commit: 7239ebb0c76f539f476cea0b44b1070e765cca41
Parents: 79bb2c2
Author: Thomas Groh <tg...@google.com>
Authored: Mon Oct 24 13:43:43 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Oct 25 10:46:43 2016 -0700

----------------------------------------------------------------------
 .../runners/direct/DoFnLifecycleManager.java    | 56 +++++++++------
 .../direct/DoFnLifecycleManagerTest.java        | 74 ++++++++++++++++++--
 2 files changed, 104 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7239ebb0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
index 23460b6..472b28b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
@@ -21,17 +21,17 @@ package org.apache.beam.runners.direct;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import java.util.ArrayList;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import java.util.Collection;
-import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.Setup;
 import org.apache.beam.sdk.transforms.DoFn.Teardown;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.SerializableUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Manages {@link DoFn} setup, teardown, and serialization.
@@ -42,16 +42,18 @@ import org.slf4j.LoggerFactory;
  * clearing all cached {@link DoFn DoFns}.
  */
 class DoFnLifecycleManager {
-  private static final Logger LOG = LoggerFactory.getLogger(DoFnLifecycleManager.class);
-
   public static DoFnLifecycleManager of(DoFn<?, ?> original) {
     return new DoFnLifecycleManager(original);
   }
 
   private final LoadingCache<Thread, DoFn<?, ?>> outstanding;
+  private final ConcurrentMap<Thread, Exception> thrownOnTeardown;
 
   private DoFnLifecycleManager(DoFn<?, ?> original) {
-    this.outstanding = CacheBuilder.newBuilder().build(new DeserializingCacheLoader(original));
+    this.outstanding = CacheBuilder.newBuilder()
+        .removalListener(new TeardownRemovedFnListener())
+        .build(new DeserializingCacheLoader(original));
+    thrownOnTeardown = new ConcurrentHashMap<>();
   }
 
   public DoFn<?, ?> get() throws Exception {
@@ -61,8 +63,15 @@ class DoFnLifecycleManager {
 
   public void remove() throws Exception {
     Thread currentThread = Thread.currentThread();
-    DoFn<?, ?> fn = outstanding.asMap().remove(currentThread);
-    DoFnInvokers.INSTANCE.invokerFor(fn).invokeTeardown();
+    outstanding.invalidate(currentThread);
+    // Block until the invalidate is fully completed
+    outstanding.cleanUp();
+    // Remove to try too avoid reporting the same teardown exception twice. May still double-report,
+    // but the second will be suppressed.
+    Exception thrown = thrownOnTeardown.remove(currentThread);
+    if (thrown != null) {
+      throw thrown;
+    }
   }
 
   /**
@@ -73,21 +82,13 @@ class DoFnLifecycleManager {
    * DoFn.Teardown @Teardown} method, and the {@link PipelineRunner} should throw an exception.
    */
   public Collection<Exception> removeAll() throws Exception {
-    Iterator<DoFn<?, ?>> fns = outstanding.asMap().values().iterator();
-    Collection<Exception> thrown = new ArrayList<>();
-    while (fns.hasNext()) {
-      DoFn<?, ?> fn = fns.next();
-      fns.remove();
-      try {
-        DoFnInvokers.INSTANCE.invokerFor(fn).invokeTeardown();
-      } catch (Exception e) {
-        thrown.add(e);
-      }
-    }
-    return thrown;
+    outstanding.invalidateAll();
+    // Make sure all of the teardowns are run
+    outstanding.cleanUp();
+    return thrownOnTeardown.values();
   }
 
-  private class DeserializingCacheLoader extends CacheLoader<Thread, DoFn<?, ?>> {
+  private static class DeserializingCacheLoader extends CacheLoader<Thread, DoFn<?, ?>> {
     private final byte[] original;
 
     public DeserializingCacheLoader(DoFn<?, ?> original) {
@@ -102,4 +103,15 @@ class DoFnLifecycleManager {
       return fn;
     }
   }
+
+  private class TeardownRemovedFnListener implements RemovalListener<Thread, DoFn<?, ?>> {
+    @Override
+    public void onRemoval(RemovalNotification<Thread, DoFn<?, ?>> notification) {
+      try {
+        DoFnInvokers.INSTANCE.newByteBuddyInvoker(notification.getValue()).invokeTeardown();
+      } catch (Exception e) {
+        thrownOnTeardown.put(notification.getKey(), e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7239ebb0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
index aef9d29..59e1e16 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.direct;
 import static com.google.common.base.Preconditions.checkState;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.theInstance;
 import static org.junit.Assert.assertThat;
@@ -34,8 +35,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.hamcrest.Matchers;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -44,6 +48,8 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class DoFnLifecycleManagerTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
   private TestFn fn = new TestFn();
   private DoFnLifecycleManager mgr = DoFnLifecycleManager.of(fn);
 
@@ -105,6 +111,17 @@ public class DoFnLifecycleManagerTest {
   }
 
   @Test
+  public void teardownThrowsRemoveThrows() throws Exception {
+    TestFn obtained = (TestFn) mgr.get();
+    obtained.teardown();
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalStateException.class));
+    thrown.expectMessage("Cannot call teardown: already torn down");
+    mgr.remove();
+  }
+
+  @Test
   public void teardownAllOnRemoveAll() throws Exception {
     CountDownLatch startSignal = new CountDownLatch(1);
     ExecutorService executor = Executors.newCachedThreadPool();
@@ -125,6 +142,38 @@ public class DoFnLifecycleManagerTest {
     }
   }
 
+  @Test
+  public void removeAndRemoveAllConcurrent() throws Exception {
+    CountDownLatch startSignal = new CountDownLatch(1);
+    ExecutorService executor = Executors.newCachedThreadPool();
+    List<Future<TestFn>> futures = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      futures.add(executor.submit(new GetFnCallable(mgr, startSignal)));
+    }
+    startSignal.countDown();
+    List<TestFn> fns = new ArrayList<>();
+    for (Future<TestFn> future : futures) {
+      fns.add(future.get(1L, TimeUnit.SECONDS));
+    }
+    CountDownLatch removeSignal = new CountDownLatch(1);
+    List<Future<Void>> removeFutures = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      // These will reuse the threads used in the GetFns
+      removeFutures.add(executor.submit(new TeardownFnCallable(mgr, removeSignal)));
+    }
+    removeSignal.countDown();
+    assertThat(mgr.removeAll(), Matchers.<Exception>emptyIterable());
+    for (Future<Void> removed : removeFutures) {
+      // Should not have thrown an exception.
+      removed.get();
+    }
+
+    for (TestFn fn : fns) {
+      assertThat(fn.setupCalled, is(true));
+      assertThat(fn.teardownCalled, is(true));
+    }
+  }
+
   private static class GetFnCallable implements Callable<TestFn> {
     private final DoFnLifecycleManager mgr;
     private final CountDownLatch startSignal;
@@ -141,6 +190,23 @@ public class DoFnLifecycleManagerTest {
     }
   }
 
+  private static class TeardownFnCallable implements Callable<Void> {
+    private final DoFnLifecycleManager mgr;
+    private final CountDownLatch startSignal;
+
+    private TeardownFnCallable(DoFnLifecycleManager mgr, CountDownLatch startSignal) {
+      this.mgr = mgr;
+      this.startSignal = startSignal;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      startSignal.await();
+      // Will throw an exception if the TestFn has already been removed from this thread
+      mgr.remove();
+      return null;
+    }
+  }
 
   private static class TestFn extends DoFn<Object, Object> {
     boolean setupCalled = false;
@@ -148,8 +214,8 @@ public class DoFnLifecycleManagerTest {
 
     @Setup
     public void setup() {
-      checkState(!setupCalled);
-      checkState(!teardownCalled);
+      checkState(!setupCalled, "Cannot call setup: already set up");
+      checkState(!teardownCalled, "Cannot call setup: already torn down");
 
       setupCalled = true;
     }
@@ -160,8 +226,8 @@ public class DoFnLifecycleManagerTest {
 
     @Teardown
     public void teardown() {
-      checkState(setupCalled);
-      checkState(!teardownCalled);
+      checkState(setupCalled, "Cannot call teardown: not set up");
+      checkState(!teardownCalled, "Cannot call teardown: already torn down");
 
       teardownCalled = true;
     }