You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/06/14 17:21:13 UTC

[1/2] beam git commit: This closes #3353

Repository: beam
Updated Branches:
  refs/heads/master eebff9089 -> dd9abc397


This closes #3353


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dd9abc39
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dd9abc39
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dd9abc39

Branch: refs/heads/master
Commit: dd9abc397c67b47f95414bfd5debb08b7b6a6eaf
Parents: eebff90 8a850af
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jun 14 10:21:02 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jun 14 10:21:02 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/CommittedResult.java    | 12 ++++-----
 .../beam/runners/direct/EvaluationContext.java  | 26 +++++++++++++++-----
 .../direct/ExecutorServiceParallelExecutor.java |  9 ++++---
 .../beam/runners/direct/WatermarkManager.java   |  4 +--
 .../runners/direct/CommittedResultTest.java     | 17 +++++++------
 .../runners/direct/TransformExecutorTest.java   | 11 +++++++--
 .../runners/direct/WatermarkManagerTest.java    | 15 ++++++++---
 7 files changed, 62 insertions(+), 32 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Do not produce Unprocessed Inputs if all inputs were Processed

Posted by tg...@apache.org.
Do not produce Unprocessed Inputs if all inputs were Processed

This stops the WatermarkManager "Pending Bundles" from growing without
bound.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8a850af3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8a850af3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8a850af3

Branch: refs/heads/master
Commit: 8a850af3304a48618739dc23e286800dc0c4641a
Parents: eebff90
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jun 13 12:50:58 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jun 14 10:21:02 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/CommittedResult.java    | 12 ++++-----
 .../beam/runners/direct/EvaluationContext.java  | 26 +++++++++++++++-----
 .../direct/ExecutorServiceParallelExecutor.java |  9 ++++---
 .../beam/runners/direct/WatermarkManager.java   |  4 +--
 .../runners/direct/CommittedResultTest.java     | 17 +++++++------
 .../runners/direct/TransformExecutorTest.java   | 11 +++++++--
 .../runners/direct/WatermarkManagerTest.java    | 15 ++++++++---
 7 files changed, 62 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index 8c45449..70e3ac3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -19,8 +19,8 @@
 package org.apache.beam.runners.direct;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.base.Optional;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 
@@ -36,12 +36,10 @@ abstract class CommittedResult {
 
   /**
    * Returns the {@link CommittedBundle} that contains the input elements that could not be
-   * processed by the evaluation.
-   *
-   * <p>{@code null} if the input bundle was null.
+   * processed by the evaluation. The returned optional is present if there were any unprocessed
+   * input elements, and absent otherwise.
    */
-  @Nullable
-  public abstract CommittedBundle<?> getUnprocessedInputs();
+  public abstract Optional<? extends CommittedBundle<?>> getUnprocessedInputs();
 
   /**
    * Returns the outputs produced by the transform.
@@ -59,7 +57,7 @@ abstract class CommittedResult {
 
   public static CommittedResult create(
       TransformResult<?> original,
-      CommittedBundle<?> unprocessedElements,
+      Optional<? extends CommittedBundle<?>> unprocessedElements,
       Iterable<? extends CommittedBundle<?>> outputs,
       Set<OutputType> producedOutputs) {
     return new AutoValue_CommittedResult(original.getTransform(),

http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index e215070..d192785 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -158,12 +159,9 @@ class EvaluationContext {
     } else {
       outputTypes.add(OutputType.BUNDLE);
     }
-    CommittedResult committedResult = CommittedResult.create(result,
-        completedBundle == null
-            ? null
-            : completedBundle.withElements((Iterable) result.getUnprocessedElements()),
-        committedBundles,
-        outputTypes);
+    CommittedResult committedResult =
+        CommittedResult.create(
+            result, getUnprocessedInput(completedBundle, result), committedBundles, outputTypes);
     // Update state internals
     CopyOnAccessInMemoryStateInternals theirState = result.getState();
     if (theirState != null) {
@@ -187,6 +185,22 @@ class EvaluationContext {
     return committedResult;
   }
 
+  /**
+   * Returns an {@link Optional} containing a bundle which contains all of the unprocessed elements
+   * that were not processed from the {@code completedBundle}. If all of the elements of the {@code
+   * completedBundle} were processed, or if {@code completedBundle} is null, returns an absent
+   * {@link Optional}.
+   */
+  private Optional<? extends CommittedBundle<?>> getUnprocessedInput(
+      @Nullable CommittedBundle<?> completedBundle, TransformResult<?> result) {
+    if (completedBundle == null || Iterables.isEmpty(result.getUnprocessedElements())) {
+      return Optional.absent();
+    }
+    CommittedBundle<?> residual =
+        completedBundle.withElements((Iterable) result.getUnprocessedElements());
+    return Optional.of(residual);
+  }
+
   private Iterable<? extends CommittedBundle<?>> commitBundles(
       Iterable<? extends UncommittedBundle<?>> bundles) {
     ImmutableList.Builder<CommittedBundle<?>> completed = ImmutableList.builder();

http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 6fe8ebd..2f4d1f6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -357,15 +357,16 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
             ExecutorUpdate.fromBundle(
                 outputBundle, graph.getPerElementConsumers(outputBundle.getPCollection())));
       }
-      CommittedBundle<?> unprocessedInputs = committedResult.getUnprocessedInputs();
-      if (unprocessedInputs != null && !Iterables.isEmpty(unprocessedInputs.getElements())) {
+      Optional<? extends CommittedBundle<?>> unprocessedInputs =
+          committedResult.getUnprocessedInputs();
+      if (unprocessedInputs.isPresent()) {
         if (inputBundle.getPCollection() == null) {
           // TODO: Split this logic out of an if statement
-          pendingRootBundles.get(result.getTransform()).offer(unprocessedInputs);
+          pendingRootBundles.get(result.getTransform()).offer(unprocessedInputs.get());
         } else {
           allUpdates.offer(
               ExecutorUpdate.fromBundle(
-                  unprocessedInputs,
+                  unprocessedInputs.get(),
                   Collections.<AppliedPTransform<?, ?, ?>>singleton(
                       committedResult.getTransform())));
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 80a3504..599b74f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -994,9 +994,9 @@ class WatermarkManager {
     }
 
     TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform());
-    if (input != null) {
+    if (result.getUnprocessedInputs().isPresent()) {
       // Add the unprocessed inputs
-      completedTransform.addPending(result.getUnprocessedInputs());
+      completedTransform.addPending(result.getUnprocessedInputs().get());
     }
     completedTransform.updateTimers(timerUpdate);
     if (input != null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index cf19dc2..8b95b34 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.beam.runners.direct;
 
-import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import java.io.Serializable;
 import java.util.Collections;
@@ -72,7 +72,7 @@ public class CommittedResultTest implements Serializable {
     CommittedResult result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
-            bundleFactory.createBundle(created).commit(Instant.now()),
+            Optional.<CommittedBundle<?>>absent(),
             Collections.<CommittedBundle<?>>emptyList(),
             EnumSet.noneOf(OutputType.class));
 
@@ -88,11 +88,11 @@ public class CommittedResultTest implements Serializable {
     CommittedResult result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
-            bundle,
+            Optional.of(bundle),
             Collections.<CommittedBundle<?>>emptyList(),
             EnumSet.noneOf(OutputType.class));
 
-    assertThat(result.getUnprocessedInputs(),
+    assertThat(result.getUnprocessedInputs().get(),
         Matchers.<CommittedBundle<?>>equalTo(bundle));
   }
 
@@ -101,11 +101,14 @@ public class CommittedResultTest implements Serializable {
     CommittedResult result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
-            null,
+            Optional.<CommittedBundle<?>>absent(),
             Collections.<CommittedBundle<?>>emptyList(),
             EnumSet.noneOf(OutputType.class));
 
-    assertThat(result.getUnprocessedInputs(), nullValue());
+    assertThat(
+        result.getUnprocessedInputs(),
+        Matchers.<Optional<? extends CommittedBundle<?>>>equalTo(
+            Optional.<CommittedBundle<?>>absent()));
   }
 
   @Test
@@ -120,7 +123,7 @@ public class CommittedResultTest implements Serializable {
     CommittedResult result =
         CommittedResult.create(
             StepTransformResult.withoutHold(transform).build(),
-            bundleFactory.createBundle(created).commit(Instant.now()),
+            Optional.<CommittedBundle<?>>absent(),
             outputs,
             EnumSet.of(OutputType.BUNDLE, OutputType.PCOLLECTION_VIEW));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 3dd4028..b7f5a7c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -25,6 +25,8 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -415,8 +417,13 @@ public class TransformExecutorTest {
               ? Collections.emptyList()
               : result.getUnprocessedElements();
 
-      CommittedBundle<?> unprocessedBundle =
-          inputBundle == null ? null : inputBundle.withElements(unprocessedElements);
+      Optional<? extends CommittedBundle<?>> unprocessedBundle;
+      if (inputBundle == null || Iterables.isEmpty(unprocessedElements)) {
+        unprocessedBundle = Optional.absent();
+      } else {
+        unprocessedBundle =
+            Optional.<CommittedBundle<?>>of(inputBundle.withElements(unprocessedElements));
+      }
       return CommittedResult.create(
           result,
           unprocessedBundle,

http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index e0b5251..e3f6215 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -318,7 +319,7 @@ public class WatermarkManagerTest implements Serializable {
         TimerUpdate.empty(),
         CommittedResult.create(
             StepTransformResult.withoutHold(graph.getProducer(created)).build(),
-            root.withElements(Collections.<WindowedValue<Void>>emptyList()),
+            Optional.<CommittedBundle<?>>absent(),
             Collections.singleton(createBundle),
             EnumSet.allOf(OutputType.class)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -332,7 +333,7 @@ public class WatermarkManagerTest implements Serializable {
         TimerUpdate.empty(),
         CommittedResult.create(
             StepTransformResult.withoutHold(theFlatten).build(),
-            createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+            Optional.<CommittedBundle<?>>absent(),
             Collections.<CommittedBundle<?>>emptyList(),
             EnumSet.allOf(OutputType.class)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -345,7 +346,7 @@ public class WatermarkManagerTest implements Serializable {
         TimerUpdate.empty(),
         CommittedResult.create(
             StepTransformResult.withoutHold(theFlatten).build(),
-            createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+            Optional.<CommittedBundle<?>>absent(),
             Collections.<CommittedBundle<?>>emptyList(),
             EnumSet.allOf(OutputType.class)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1501,9 +1502,15 @@ public class WatermarkManagerTest implements Serializable {
       AppliedPTransform<?, ?, ?> transform,
       @Nullable CommittedBundle<?> unprocessedBundle,
       Iterable<? extends CommittedBundle<?>> bundles) {
+    Optional<? extends CommittedBundle<?>> unprocessedElements;
+    if (unprocessedBundle == null || Iterables.isEmpty(unprocessedBundle.getElements())) {
+      unprocessedElements = Optional.absent();
+    } else {
+      unprocessedElements = Optional.of(unprocessedBundle);
+    }
     return CommittedResult.create(
         StepTransformResult.withoutHold(transform).build(),
-        unprocessedBundle,
+        unprocessedElements,
         bundles,
         Iterables.isEmpty(bundles)
             ? EnumSet.noneOf(OutputType.class)