You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/11 17:42:29 UTC
[1/2] incubator-beam git commit: Use an AtomicReference in
InProcessSideInputContainer
Repository: incubator-beam
Updated Branches:
refs/heads/master 351fc3efa -> be9b15803
Use an AtomicReference in InProcessSideInputContainer
This fixes a TOCTOU race in the contents updating logic, where the
determination that the current pane should replace the contents of the
side input and the replacement is not a single atomic operation. Using
AtomicReference allows the use of compareAndSet to ensure that the
replacement can only occur on the pane that the decision to replace was
made with.
Fixes a race where a pane could be the latest, and replace a
pane, but would be lost due to an earlier pane being written between the
invalidation and loading of contents.
Fixes a race where a reader can incorrectly read an empty iterable as
the contents of a PCollectionView, due to occuring between the
invalidate and reload steps.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77475992
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77475992
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77475992
Branch: refs/heads/master
Commit: 7747599233aee96c8077c46db5ce093a856b6139
Parents: 4870525
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 10 11:27:37 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue May 10 15:02:29 2016 -0700
----------------------------------------------------------------------
.../direct/InProcessSideInputContainer.java | 199 +++++++++----------
.../direct/InProcessSideInputContainerTest.java | 95 +++++----
2 files changed, 147 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77475992/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
index 78889dc..1ef8f13 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
@@ -33,18 +33,16 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
@@ -54,36 +52,26 @@ import javax.annotation.Nullable;
* available and writing to a {@link PCollectionView}.
*/
class InProcessSideInputContainer {
- private final InProcessEvaluationContext evaluationContext;
private final Collection<PCollectionView<?>> containedViews;
- private final LoadingCache<PCollectionViewWindow<?>,
- SettableFuture<Iterable<? extends WindowedValue<?>>>> viewByWindows;
+ private final LoadingCache<
+ PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
+ viewByWindows;
/**
* Create a new {@link InProcessSideInputContainer} with the provided views and the provided
* context.
*/
public static InProcessSideInputContainer create(
- InProcessEvaluationContext context, Collection<PCollectionView<?>> containedViews) {
- CacheLoader<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
- loader = new CacheLoader<PCollectionViewWindow<?>,
- SettableFuture<Iterable<? extends WindowedValue<?>>>>() {
- @Override
- public SettableFuture<Iterable<? extends WindowedValue<?>>> load(
- PCollectionViewWindow<?> view) {
- return SettableFuture.create();
- }
- };
- LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
- viewByWindows = CacheBuilder.newBuilder().build(loader);
- return new InProcessSideInputContainer(context, containedViews, viewByWindows);
+ final InProcessEvaluationContext context, Collection<PCollectionView<?>> containedViews) {
+ LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
+ viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context));
+ return new InProcessSideInputContainer(containedViews, viewByWindows);
}
- private InProcessSideInputContainer(InProcessEvaluationContext context,
+ private InProcessSideInputContainer(
Collection<PCollectionView<?>> containedViews,
- LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
- viewByWindows) {
- this.evaluationContext = context;
+ LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
+ viewByWindows) {
this.containedViews = ImmutableSet.copyOf(containedViews);
this.viewByWindows = viewByWindows;
}
@@ -149,29 +137,75 @@ class InProcessSideInputContainer {
private void updatePCollectionViewWindowValues(
PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) {
PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window);
- SettableFuture<Iterable<? extends WindowedValue<?>>> future = null;
- try {
- future = viewByWindows.get(windowedView);
- if (future.isDone()) {
- Iterator<? extends WindowedValue<?>> existingValues = future.get().iterator();
- PaneInfo newPane = windowValues.iterator().next().getPane();
- // The current value may have no elements, if no elements were produced for the window,
- // but we are recieving late data.
- if (!existingValues.hasNext()
- || newPane.getIndex() > existingValues.next().getPane().getIndex()) {
- viewByWindows.invalidate(windowedView);
- viewByWindows.get(windowedView).set(windowValues);
- }
- } else {
- future.set(windowValues);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- if (future != null && !future.isDone()) {
- future.set(Collections.<WindowedValue<?>>emptyList());
- }
- } catch (ExecutionException e) {
- throw new RuntimeException(e.getCause());
+ AtomicReference<Iterable<? extends WindowedValue<?>>> contents =
+ viewByWindows.getUnchecked(windowedView);
+ if (contents.compareAndSet(null, windowValues)) {
+ // the value had never been set, so we set it and are done.
+ return;
+ }
+ PaneInfo newPane = windowValues.iterator().next().getPane();
+
+ Iterable<? extends WindowedValue<?>> existingValues;
+ long existingPane;
+ do {
+ existingValues = contents.get();
+ existingPane =
+ Iterables.isEmpty(existingValues)
+ ? -1L
+ : existingValues.iterator().next().getPane().getIndex();
+ } while (newPane.getIndex() > existingPane
+ && !contents.compareAndSet(existingValues, windowValues));
+ }
+
+ private static class CallbackSchedulingLoader extends
+ CacheLoader<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> {
+ private final InProcessEvaluationContext context;
+
+ public CallbackSchedulingLoader(
+ InProcessEvaluationContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public AtomicReference<Iterable<? extends WindowedValue<?>>>
+ load(PCollectionViewWindow<?> view) {
+
+ AtomicReference<Iterable<? extends WindowedValue<?>>> contents = new AtomicReference<>();
+ WindowingStrategy<?, ?> windowingStrategy = view.getView().getWindowingStrategyInternal();
+
+ context.scheduleAfterOutputWouldBeProduced(view.getView(),
+ view.getWindow(),
+ windowingStrategy,
+ new WriteEmptyViewContents(view.getView(), view.getWindow(), contents));
+ return contents;
+ }
+ }
+
+ private static class WriteEmptyViewContents implements Runnable {
+ private final PCollectionView<?> view;
+ private final BoundedWindow window;
+ private final AtomicReference<Iterable<? extends WindowedValue<?>>> contents;
+
+ private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window,
+ AtomicReference<Iterable<? extends WindowedValue<?>>> contents) {
+ this.contents = contents;
+ this.view = view;
+ this.window = window;
+ }
+
+ @Override
+ public void run() {
+ // The requested window has closed without producing elements, so reflect that in
+ // the PCollectionView. If set has already been called, will do nothing.
+ contents.compareAndSet(null, Collections.<WindowedValue<?>>emptyList());
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("view", view)
+ .add("window", window)
+ .toString();
}
}
@@ -190,43 +224,23 @@ class InProcessSideInputContainer {
+ "Contained views; %s",
view,
readerViews);
- return getViewFuture(view, window).isDone();
+ return viewByWindows.getUnchecked(PCollectionViewWindow.of(view, window)).get() != null;
}
@Override
@Nullable
public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
- checkArgument(
- readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view);
- try {
- final Future<Iterable<? extends WindowedValue<?>>> future = getViewFuture(view, window);
- // Safe covariant cast
- @SuppressWarnings("unchecked")
- Iterable<WindowedValue<?>> values = (Iterable<WindowedValue<?>>) future.get();
- return view.fromIterableInternal(values);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return null;
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Gets the future containing the contents of the provided {@link PCollectionView} in the
- * provided {@link BoundedWindow}, setting up a callback to populate the future with empty
- * contents if necessary.
- */
- private <T> Future<Iterable<? extends WindowedValue<?>>> getViewFuture(
- final PCollectionView<T> view, final BoundedWindow window) {
- PCollectionViewWindow<T> windowedView = PCollectionViewWindow.of(view, window);
- final SettableFuture<Iterable<? extends WindowedValue<?>>> future =
- viewByWindows.getUnchecked(windowedView);
-
- WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
- evaluationContext.scheduleAfterOutputWouldBeProduced(
- view, window, windowingStrategy, new WriteEmptyViewContents(view, window, future));
- return future;
+ checkArgument(readerViews.contains(view),
+ "calling get(PCollectionView) with unknown view: " + view);
+ checkArgument(isReady(view, window),
+ "calling get(PCollectionView) with view %s that is not ready in window %s",
+ view,
+ window);
+ // Safe covariant cast
+ @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
+ (Iterable<WindowedValue<?>>) viewByWindows
+ .getUnchecked(PCollectionViewWindow.of(view, window)).get();
+ return view.fromIterableInternal(values);
}
@Override
@@ -240,31 +254,4 @@ class InProcessSideInputContainer {
}
}
- private static class WriteEmptyViewContents implements Runnable {
- private final PCollectionView<?> view;
- private final BoundedWindow window;
- private final SettableFuture<Iterable<? extends WindowedValue<?>>> future;
-
- private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window,
- SettableFuture<Iterable<? extends WindowedValue<?>>> future) {
- this.future = future;
- this.view = view;
- this.window = window;
- }
-
- @Override
- public void run() {
- // The requested window has closed without producing elements, so reflect that in
- // the PCollectionView. If set has already been called, will do nothing.
- future.set(Collections.<WindowedValue<?>>emptyList());
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("view", view)
- .add("window", window)
- .toString();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77475992/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
index 8f89e70..2f376dd 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import org.apache.beam.sdk.coders.KvCoder;
@@ -61,8 +62,10 @@ import org.mockito.stubbing.Answer;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
/**
* Tests for {@link InProcessSideInputContainer}.
@@ -194,46 +197,12 @@ public class InProcessSideInputContainerTest {
* there is data in the pane.
*/
@Test
- public void getBlocksUntilPaneAvailable() throws Exception {
- BoundedWindow window =
- new BoundedWindow() {
- @Override
- public Instant maxTimestamp() {
- return new Instant(1024L);
- }
- };
- Future<Double> singletonFuture =
- getFutureOfView(
- container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)),
- singletonView,
- window);
-
- WindowedValue<Double> singletonValue =
- WindowedValue.of(4.75, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-
- assertThat(singletonFuture.isDone(), is(false));
- container.write(singletonView, ImmutableList.<WindowedValue<?>>of(singletonValue));
- assertThat(singletonFuture.get(), equalTo(4.75));
- }
-
- @Test
- public void withPCollectionViewsWithPutInOriginalReturnsContents() throws Exception {
- BoundedWindow window = new BoundedWindow() {
- @Override
- public Instant maxTimestamp() {
- return new Instant(1024L);
- }
- };
- SideInputReader newReader =
- container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView));
- Future<Double> singletonFuture = getFutureOfView(newReader, singletonView, window);
-
- WindowedValue<Double> singletonValue =
- WindowedValue.of(24.125, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+ public void getNotReadyThrows() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("not ready");
- assertThat(singletonFuture.isDone(), is(false));
- container.write(singletonView, ImmutableList.<WindowedValue<?>>of(singletonValue));
- assertThat(singletonFuture.get(), equalTo(24.125));
+ container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+ .get(mapView, GlobalWindow.INSTANCE);
}
@Test
@@ -448,15 +417,20 @@ public class InProcessSideInputContainerTest {
}
@Test
- public void isReadyForEmptyWindowTrue() {
+ public void isReadyForEmptyWindowTrue() throws Exception {
+ CountDownLatch onComplete = new CountDownLatch(1);
immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE);
+ CountDownLatch latch = invokeLatchedCallback(singletonView, GlobalWindow.INSTANCE, onComplete);
ReadyCheckingSideInputReader reader =
container.createReaderForViews(ImmutableList.of(mapView, singletonView));
assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true));
assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
- immediatelyInvokeCallback(singletonView, GlobalWindow.INSTANCE);
+ latch.countDown();
+ if (!onComplete.await(1500L, TimeUnit.MILLISECONDS)) {
+ fail("Callback to set empty values did not complete!");
+ }
assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true));
}
@@ -483,6 +457,45 @@ public class InProcessSideInputContainerTest {
Mockito.any(Runnable.class));
}
+ /**
+ * When a callAfterWindowCloses with the specified view's producing transform, window, and
+ * windowing strategy is invoked, start a thread that will invoke the callback after the returned
+ * {@link CountDownLatch} is counted down once.
+ */
+ private CountDownLatch invokeLatchedCallback(
+ PCollectionView<?> view, BoundedWindow window, final CountDownLatch onComplete) {
+ final CountDownLatch runLatch = new CountDownLatch(1);
+ doAnswer(
+ new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Object callback = invocation.getArguments()[3];
+ final Runnable callbackRunnable = (Runnable) callback;
+ Executors.newSingleThreadExecutor().submit(new Runnable() {
+ public void run() {
+ try {
+ if (!runLatch.await(1500L, TimeUnit.MILLISECONDS)) {
+ fail("Run latch didn't count down within timeout");
+ }
+ callbackRunnable.run();
+ onComplete.countDown();
+ } catch (InterruptedException e) {
+ fail("Unexpectedly interrupted while waiting for latch to be counted down");
+ }
+ }
+ });
+ return null;
+ }
+ })
+ .when(context)
+ .scheduleAfterOutputWouldBeProduced(
+ Mockito.eq(view),
+ Mockito.eq(window),
+ Mockito.eq(view.getWindowingStrategyInternal()),
+ Mockito.any(Runnable.class));
+ return runLatch;
+ }
+
private <ValueT> Future<ValueT> getFutureOfView(final SideInputReader myReader,
final PCollectionView<ValueT> view, final BoundedWindow window) {
Callable<ValueT> callable = new Callable<ValueT>() {
[2/2] incubator-beam git commit: This closes #318
Posted by ke...@apache.org.
This closes #318
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/be9b1580
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/be9b1580
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/be9b1580
Branch: refs/heads/master
Commit: be9b1580316595868918be76b360fefa02bc7f13
Parents: 351fc3e 7747599
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 11 10:42:15 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 11 10:42:15 2016 -0700
----------------------------------------------------------------------
.../direct/InProcessSideInputContainer.java | 199 +++++++++----------
.../direct/InProcessSideInputContainerTest.java | 95 +++++----
2 files changed, 147 insertions(+), 147 deletions(-)
----------------------------------------------------------------------