You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/12 01:33:26 UTC

[2/3] incubator-beam git commit: Cache read SideInput Contents in the InProcessSideInputContainer

Cache read SideInput Contents in the InProcessSideInputContainer

This ensures that while processing a bundle all elements see the same
contents for any SideInput Window.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/49689fce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/49689fce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/49689fce

Branch: refs/heads/master
Commit: 49689fced52e29d7efd386202265d0a105fab276
Parents: a25fd05
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 10 13:22:20 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 11 13:01:26 2016 -0700

----------------------------------------------------------------------
 .../direct/InProcessSideInputContainer.java     | 33 +++++++++++++++-----
 .../direct/InProcessEvaluationContextTest.java  |  3 ++
 .../direct/InProcessSideInputContainerTest.java | 15 +++++++--
 3 files changed, 42 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49689fce/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
index 1ef8f13..96a9ad2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Optional;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -211,9 +212,13 @@ class InProcessSideInputContainer {
 
   private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader {
     private final Collection<PCollectionView<?>> readerViews;
+    private final LoadingCache<
+        PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>>
+        viewContents;
 
     private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) {
       this.readerViews = ImmutableSet.copyOf(readerViews);
+      this.viewContents = CacheBuilder.newBuilder().build(new CurrentViewContentsLoader());
     }
 
     @Override
@@ -224,22 +229,23 @@ class InProcessSideInputContainer {
               + "Contained views; %s",
           view,
           readerViews);
-      return viewByWindows.getUnchecked(PCollectionViewWindow.of(view, window)).get() != null;
+      return viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).isPresent();
     }
 
     @Override
     @Nullable
     public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
-      checkArgument(readerViews.contains(view),
-          "calling get(PCollectionView) with unknown view: " + view);
-      checkArgument(isReady(view, window),
-          "calling get(PCollectionView) with view %s that is not ready in window %s",
+      checkArgument(
+          readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view);
+      checkArgument(
+          isReady(view, window),
+          "calling get() on a PCollectionView %s that is not ready in window %s",
           view,
           window);
       // Safe covariant cast
       @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
-          (Iterable<WindowedValue<?>>) viewByWindows
-              .getUnchecked(PCollectionViewWindow.of(view, window)).get();
+          (Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view,
+              window)).get();
       return view.fromIterableInternal(values);
     }
 
@@ -254,4 +260,17 @@ class InProcessSideInputContainer {
     }
   }
 
+  /**
+   * A {@link CacheLoader} that loads the current contents of a {@link PCollectionViewWindow} into
+   * an optional.
+   */
+  private class CurrentViewContentsLoader extends CacheLoader<
+      PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>> {
+
+    @Override
+    public Optional<? extends Iterable<? extends WindowedValue<?>>>
+        load(PCollectionViewWindow<?> key) {
+      return Optional.fromNullable(viewByWindows.getUnchecked(key).get());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49689fce/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
index b73e41a..10b8721 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
@@ -150,6 +150,9 @@ public class InProcessEvaluationContextTest {
         WindowedValue.of(
             4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1));
     viewWriter.add(Collections.singleton(overrittenSecondValue));
+    assertThat(reader.get(view, second), containsInAnyOrder(2));
+    // The cached value is served in the earlier reader
+    reader = context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));
     assertThat(reader.get(view, second), containsInAnyOrder(4444));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49689fce/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
index 2f376dd..746c0f8 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java
@@ -399,7 +399,8 @@ public class InProcessSideInputContainerTest {
                 FIRST_WINDOW.maxTimestamp().minus(100L),
                 FIRST_WINDOW,
                 PaneInfo.ON_TIME_AND_ONLY_FIRING)));
-    assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true));
+    // Cached value is false
+    assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));
 
     container.write(
         singletonView,
@@ -410,10 +411,15 @@ public class InProcessSideInputContainerTest {
                 SECOND_WINDOW,
                 PaneInfo.ON_TIME_AND_ONLY_FIRING)));
     assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
-    assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true));
+    assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
 
     assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false));
     assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
+
+    reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView));
+    assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
+    assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true));
+    assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true));
   }
 
   @Test
@@ -431,6 +437,11 @@ public class InProcessSideInputContainerTest {
     if (!onComplete.await(1500L, TimeUnit.MILLISECONDS)) {
       fail("Callback to set empty values did not complete!");
     }
+    // The cached value was false, so it continues to be true
+    assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
+
+    // A new reader for the same container gets a fresh look
+    reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView));
     assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true));
   }