You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/11 17:42:29 UTC

[1/2] incubator-beam git commit: Use an AtomicReference in InProcessSideInputContainer

Repository: incubator-beam
Updated Branches:
  refs/heads/master 351fc3efa -> be9b15803


Use an AtomicReference in InProcessSideInputContainer

This fixes a TOCTOU race in the contents updating logic, where the
determination that the current pane should replace the contents of the
side input and the replacement is not a single atomic operation. Using
AtomicReference allows the use of compareAndSet to ensure that the
replacement can only occur on the pane that the decision to replace was
made with.

Fixes a race where a pane could be the latest, and replace a
pane, but would be lost due to an earlier pane being written between the
invalidation and loading of contents.

Fixes a race where a reader can incorrectly read an empty iterable as
the contents of a PCollectionView, due to occuring between the
invalidate and reload steps.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77475992
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77475992
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77475992

Branch: refs/heads/master
Commit: 7747599233aee96c8077c46db5ce093a856b6139
Parents: 4870525
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 10 11:27:37 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue May 10 15:02:29 2016 -0700

----------------------------------------------------------------------
 .../direct/InProcessSideInputContainer.java     | 199 +++++++++----------
 .../direct/InProcessSideInputContainerTest.java |  95 +++++----
 2 files changed, 147 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77475992/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
index 78889dc..1ef8f13 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
@@ -33,18 +33,16 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.SettableFuture;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nullable;
 
@@ -54,36 +52,26 @@ import javax.annotation.Nullable;
  * available and writing to a {@link PCollectionView}.
  */
 class InProcessSideInputContainer {
-  private final InProcessEvaluationContext evaluationContext;
   private final Collection<PCollectionView<?>> containedViews;
-  private final LoadingCache<PCollectionViewWindow<?>,
-      SettableFuture<Iterable<? extends WindowedValue<?>>>> viewByWindows;
+  private final LoadingCache<
+          PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
+      viewByWindows;
 
   /**
    * Create a new {@link InProcessSideInputContainer} with the provided views and the provided
    * context.
    */
   public static InProcessSideInputContainer create(
-      InProcessEvaluationContext context, Collection<PCollectionView<?>> containedViews) {
-    CacheLoader<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
-        loader = new CacheLoader<PCollectionViewWindow<?>,
-            SettableFuture<Iterable<? extends WindowedValue<?>>>>() {
-          @Override
-          public SettableFuture<Iterable<? extends WindowedValue<?>>> load(
-              PCollectionViewWindow<?> view) {
-            return SettableFuture.create();
-          }
-        };
-    LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
-        viewByWindows = CacheBuilder.newBuilder().build(loader);
-    return new InProcessSideInputContainer(context, containedViews, viewByWindows);
+      final InProcessEvaluationContext context, Collection<PCollectionView<?>> containedViews) {
+    LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
+        viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context));
+    return new InProcessSideInputContainer(containedViews, viewByWindows);
   }
 
-  private InProcessSideInputContainer(InProcessEvaluationContext context,
+  private InProcessSideInputContainer(
       Collection<PCollectionView<?>> containedViews,
-      LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
-      viewByWindows) {
-    this.evaluationContext = context;
+      LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
+          viewByWindows) {
     this.containedViews = ImmutableSet.copyOf(containedViews);
     this.viewByWindows = viewByWindows;
   }
@@ -149,29 +137,75 @@ class InProcessSideInputContainer {
   private void updatePCollectionViewWindowValues(
       PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) {
     PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window);
-    SettableFuture<Iterable<? extends WindowedValue<?>>> future = null;
-    try {
-      future = viewByWindows.get(windowedView);
-      if (future.isDone()) {
-        Iterator<? extends WindowedValue<?>> existingValues = future.get().iterator();
-        PaneInfo newPane = windowValues.iterator().next().getPane();
-        // The current value may have no elements, if no elements were produced for the window,
-        // but we are recieving late data.
-        if (!existingValues.hasNext()
-            || newPane.getIndex() > existingValues.next().getPane().getIndex()) {
-          viewByWindows.invalidate(windowedView);
-          viewByWindows.get(windowedView).set(windowValues);
-        }
-      } else {
-        future.set(windowValues);
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      if (future != null && !future.isDone()) {
-        future.set(Collections.<WindowedValue<?>>emptyList());
-      }
-    } catch (ExecutionException e) {
-      throw new RuntimeException(e.getCause());
+    AtomicReference<Iterable<? extends WindowedValue<?>>> contents =
+        viewByWindows.getUnchecked(windowedView);
+    if (contents.compareAndSet(null, windowValues)) {
+      // the value had never been set, so we set it and are done.
+      return;
+    }
+    PaneInfo newPane = windowValues.iterator().next().getPane();
+
+    Iterable<? extends WindowedValue<?>> existingValues;
+    long existingPane;
+    do {
+      existingValues = contents.get();
+      existingPane =
+          Iterables.isEmpty(existingValues)
+              ? -1L
+              : existingValues.iterator().next().getPane().getIndex();
+    } while (newPane.getIndex() > existingPane
+        && !contents.compareAndSet(existingValues, windowValues));
+  }
+
+  private static class CallbackSchedulingLoader extends
+      CacheLoader<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> {
+    private final InProcessEvaluationContext context;
+
+    public CallbackSchedulingLoader(
+        InProcessEvaluationContext context) {
+      this.context = context;
+    }
+
+    @Override
+    public AtomicReference<Iterable<? extends WindowedValue<?>>>
+        load(PCollectionViewWindow<?> view) {
+
+      AtomicReference<Iterable<? extends WindowedValue<?>>> contents = new AtomicReference<>();
+      WindowingStrategy<?, ?> windowingStrategy = view.getView().getWindowingStrategyInternal();
+
+      context.scheduleAfterOutputWouldBeProduced(view.getView(),
+          view.getWindow(),
+          windowingStrategy,
+          new WriteEmptyViewContents(view.getView(), view.getWindow(), contents));
+      return contents;
+    }
+  }
+
+  private static class WriteEmptyViewContents implements Runnable {
+    private final PCollectionView<?> view;
+    private final BoundedWindow window;
+    private final AtomicReference<Iterable<? extends WindowedValue<?>>> contents;
+
+    private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window,
+        AtomicReference<Iterable<? extends WindowedValue<?>>> contents) {
+      this.contents = contents;
+      this.view = view;
+      this.window = window;
+    }
+
+    @Override
+    public void run() {
+      // The requested window has closed without producing elements, so reflect that in
+      // the PCollectionView. If set has already been called, will do nothing.
+      contents.compareAndSet(null, Collections.<WindowedValue<?>>emptyList());
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("view", view)
+          .add("window", window)
+          .toString();
     }
   }
 
@@ -190,43 +224,23 @@ class InProcessSideInputContainer {
               + "Contained views; %s",
           view,
           readerViews);
-      return getViewFuture(view, window).isDone();
+      return viewByWindows.getUnchecked(PCollectionViewWindow.of(view, window)).get() != null;
     }
 
     @Override
     @Nullable
     public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
-      checkArgument(
-          readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view);
-      try {
-        final Future<Iterable<? extends WindowedValue<?>>> future = getViewFuture(view, window);
-        // Safe covariant cast
-        @SuppressWarnings("unchecked")
-        Iterable<WindowedValue<?>> values = (Iterable<WindowedValue<?>>) future.get();
-        return view.fromIterableInternal(values);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return null;
-      } catch (ExecutionException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    /**
-     * Gets the future containing the contents of the provided {@link PCollectionView} in the
-     * provided {@link BoundedWindow}, setting up a callback to populate the future with empty
-     * contents if necessary.
-     */
-    private <T> Future<Iterable<? extends WindowedValue<?>>> getViewFuture(
-        final PCollectionView<T> view, final BoundedWindow window)  {
-      PCollectionViewWindow<T> windowedView = PCollectionViewWindow.of(view, window);
-      final SettableFuture<Iterable<? extends WindowedValue<?>>> future =
-          viewByWindows.getUnchecked(windowedView);
-
-      WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
-      evaluationContext.scheduleAfterOutputWouldBeProduced(
-          view, window, windowingStrategy, new WriteEmptyViewContents(view, window, future));
-      return future;
+      checkArgument(readerViews.contains(view),
+          "calling get(PCollectionView) with unknown view: " + view);
+      checkArgument(isReady(view, window),
+          "calling get(PCollectionView) with view %s that is not ready in window %s",
+          view,
+          window);
+      // Safe covariant cast
+      @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
+          (Iterable<WindowedValue<?>>) viewByWindows
+              .getUnchecked(PCollectionViewWindow.of(view, window)).get();
+      return view.fromIterableInternal(values);
     }
 
     @Override
@@ -240,31 +254,4 @@ class InProcessSideInputContainer {
     }
   }
 
-  private static class WriteEmptyViewContents implements Runnable {
-    private final PCollectionView<?> view;
-    private final BoundedWindow window;
-    private final SettableFuture<Iterable<? extends WindowedValue<?>>> future;
-
-    private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window,
-        SettableFuture<Iterable<? extends WindowedValue<?>>> future) {
-      this.future = future;
-      this.view = view;
-      this.window = window;
-    }
-
-    @Override
-    public void run() {
-      // The requested window has closed without producing elements, so reflect that in
-      // the PCollectionView. If set has already been called, will do nothing.
-      future.set(Collections.<WindowedValue<?>>emptyList());
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("view", view)
-          .add("window", window)
-          .toString();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77475992/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
index 8f89e70..2f376dd 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
 
 import org.apache.beam.sdk.coders.KvCoder;
@@ -61,8 +62,10 @@ import org.mockito.stubbing.Answer;
 
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Tests for {@link InProcessSideInputContainer}.
@@ -194,46 +197,12 @@ public class InProcessSideInputContainerTest {
    * there is data in the pane.
    */
   @Test
-  public void getBlocksUntilPaneAvailable() throws Exception {
-    BoundedWindow window =
-        new BoundedWindow() {
-          @Override
-          public Instant maxTimestamp() {
-            return new Instant(1024L);
-          }
-        };
-    Future<Double> singletonFuture =
-        getFutureOfView(
-            container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)),
-            singletonView,
-            window);
-
-    WindowedValue<Double> singletonValue =
-        WindowedValue.of(4.75, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-
-    assertThat(singletonFuture.isDone(), is(false));
-    container.write(singletonView, ImmutableList.<WindowedValue<?>>of(singletonValue));
-    assertThat(singletonFuture.get(), equalTo(4.75));
-  }
-
-  @Test
-  public void withPCollectionViewsWithPutInOriginalReturnsContents() throws Exception {
-    BoundedWindow window = new BoundedWindow() {
-      @Override
-      public Instant maxTimestamp() {
-        return new Instant(1024L);
-      }
-    };
-    SideInputReader newReader =
-        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView));
-    Future<Double> singletonFuture = getFutureOfView(newReader, singletonView, window);
-
-    WindowedValue<Double> singletonValue =
-        WindowedValue.of(24.125, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+  public void getNotReadyThrows() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("not ready");
 
-    assertThat(singletonFuture.isDone(), is(false));
-    container.write(singletonView, ImmutableList.<WindowedValue<?>>of(singletonValue));
-    assertThat(singletonFuture.get(), equalTo(24.125));
+    container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+        .get(mapView, GlobalWindow.INSTANCE);
   }
 
   @Test
@@ -448,15 +417,20 @@ public class InProcessSideInputContainerTest {
   }
 
   @Test
-  public void isReadyForEmptyWindowTrue() {
+  public void isReadyForEmptyWindowTrue() throws Exception {
+    CountDownLatch onComplete = new CountDownLatch(1);
     immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE);
+    CountDownLatch latch = invokeLatchedCallback(singletonView, GlobalWindow.INSTANCE, onComplete);
 
     ReadyCheckingSideInputReader reader =
         container.createReaderForViews(ImmutableList.of(mapView, singletonView));
     assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true));
     assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
 
-    immediatelyInvokeCallback(singletonView, GlobalWindow.INSTANCE);
+    latch.countDown();
+    if (!onComplete.await(1500L, TimeUnit.MILLISECONDS)) {
+      fail("Callback to set empty values did not complete!");
+    }
     assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true));
   }
 
@@ -483,6 +457,45 @@ public class InProcessSideInputContainerTest {
             Mockito.any(Runnable.class));
   }
 
+  /**
+   * When a callAfterWindowCloses with the specified view's producing transform, window, and
+   * windowing strategy is invoked, start a thread that will invoke the callback after the returned
+   * {@link CountDownLatch} is counted down once.
+   */
+  private CountDownLatch invokeLatchedCallback(
+      PCollectionView<?> view, BoundedWindow window, final CountDownLatch onComplete) {
+    final CountDownLatch runLatch = new CountDownLatch(1);
+    doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            Object callback = invocation.getArguments()[3];
+            final Runnable callbackRunnable = (Runnable) callback;
+            Executors.newSingleThreadExecutor().submit(new Runnable() {
+              public void run() {
+                try {
+                  if (!runLatch.await(1500L, TimeUnit.MILLISECONDS)) {
+                    fail("Run latch didn't count down within timeout");
+                  }
+                  callbackRunnable.run();
+                  onComplete.countDown();
+                } catch (InterruptedException e) {
+                  fail("Unexpectedly interrupted while waiting for latch to be counted down");
+                }
+              }
+            });
+            return null;
+          }
+        })
+        .when(context)
+        .scheduleAfterOutputWouldBeProduced(
+            Mockito.eq(view),
+            Mockito.eq(window),
+            Mockito.eq(view.getWindowingStrategyInternal()),
+            Mockito.any(Runnable.class));
+    return runLatch;
+  }
+
   private <ValueT> Future<ValueT> getFutureOfView(final SideInputReader myReader,
       final PCollectionView<ValueT> view, final BoundedWindow window) {
     Callable<ValueT> callable = new Callable<ValueT>() {


[2/2] incubator-beam git commit: This closes #318

Posted by ke...@apache.org.
This closes #318


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/be9b1580
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/be9b1580
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/be9b1580

Branch: refs/heads/master
Commit: be9b1580316595868918be76b360fefa02bc7f13
Parents: 351fc3e 7747599
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 11 10:42:15 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 11 10:42:15 2016 -0700

----------------------------------------------------------------------
 .../direct/InProcessSideInputContainer.java     | 199 +++++++++----------
 .../direct/InProcessSideInputContainerTest.java |  95 +++++----
 2 files changed, 147 insertions(+), 147 deletions(-)
----------------------------------------------------------------------