You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/12 01:33:26 UTC
[2/3] incubator-beam git commit: Cache read SideInput Contents in the
InProcessSideInputContainer
Cache read SideInput Contents in the InProcessSideInputContainer
This ensures that while processing a bundle all elements see the same
contents for any SideInput Window.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/49689fce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/49689fce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/49689fce
Branch: refs/heads/master
Commit: 49689fced52e29d7efd386202265d0a105fab276
Parents: a25fd05
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 10 13:22:20 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 11 13:01:26 2016 -0700
----------------------------------------------------------------------
.../direct/InProcessSideInputContainer.java | 33 +++++++++++++++-----
.../direct/InProcessEvaluationContextTest.java | 3 ++
.../direct/InProcessSideInputContainerTest.java | 15 +++++++--
3 files changed, 42 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49689fce/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
index 1ef8f13..96a9ad2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -211,9 +212,13 @@ class InProcessSideInputContainer {
private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader {
private final Collection<PCollectionView<?>> readerViews;
+ private final LoadingCache<
+ PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>>
+ viewContents;
private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) {
this.readerViews = ImmutableSet.copyOf(readerViews);
+ this.viewContents = CacheBuilder.newBuilder().build(new CurrentViewContentsLoader());
}
@Override
@@ -224,22 +229,23 @@ class InProcessSideInputContainer {
+ "Contained views; %s",
view,
readerViews);
- return viewByWindows.getUnchecked(PCollectionViewWindow.of(view, window)).get() != null;
+ return viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).isPresent();
}
@Override
@Nullable
public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
- checkArgument(readerViews.contains(view),
- "calling get(PCollectionView) with unknown view: " + view);
- checkArgument(isReady(view, window),
- "calling get(PCollectionView) with view %s that is not ready in window %s",
+ checkArgument(
+ readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view);
+ checkArgument(
+ isReady(view, window),
+ "calling get() on a PCollectionView %s that is not ready in window %s",
view,
window);
// Safe covariant cast
@SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
- (Iterable<WindowedValue<?>>) viewByWindows
- .getUnchecked(PCollectionViewWindow.of(view, window)).get();
+ (Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view,
+ window)).get();
return view.fromIterableInternal(values);
}
@@ -254,4 +260,17 @@ class InProcessSideInputContainer {
}
}
+ /**
+ * A {@link CacheLoader} that loads the current contents of a {@link PCollectionViewWindow} into
+ * an optional.
+ */
+ private class CurrentViewContentsLoader extends CacheLoader<
+ PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>> {
+
+ @Override
+ public Optional<? extends Iterable<? extends WindowedValue<?>>>
+ load(PCollectionViewWindow<?> key) {
+ return Optional.fromNullable(viewByWindows.getUnchecked(key).get());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49689fce/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
index b73e41a..10b8721 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
@@ -150,6 +150,9 @@ public class InProcessEvaluationContextTest {
WindowedValue.of(
4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1));
viewWriter.add(Collections.singleton(overrittenSecondValue));
+ assertThat(reader.get(view, second), containsInAnyOrder(2));
+ // The cached value is served in the earlier reader
+ reader = context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));
assertThat(reader.get(view, second), containsInAnyOrder(4444));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49689fce/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
index 2f376dd..746c0f8 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
@@ -399,7 +399,8 @@ public class InProcessSideInputContainerTest {
FIRST_WINDOW.maxTimestamp().minus(100L),
FIRST_WINDOW,
PaneInfo.ON_TIME_AND_ONLY_FIRING)));
- assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true));
+ // Cached value is false
+ assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));
container.write(
singletonView,
@@ -410,10 +411,15 @@ public class InProcessSideInputContainerTest {
SECOND_WINDOW,
PaneInfo.ON_TIME_AND_ONLY_FIRING)));
assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
- assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true));
+ assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false));
assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
+
+ reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView));
+ assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
+ assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true));
+ assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true));
}
@Test
@@ -431,6 +437,11 @@ public class InProcessSideInputContainerTest {
if (!onComplete.await(1500L, TimeUnit.MILLISECONDS)) {
fail("Callback to set empty values did not complete!");
}
+ // The cached value was false, so it continues to be true
+ assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
+
+ // A new reader for the same container gets a fresh look
+ reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView));
assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true));
}