You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/07 20:36:06 UTC

[1/2] incubator-beam git commit: Allow DoFns to be Reused

Repository: incubator-beam
Updated Branches:
  refs/heads/master fcf5bc71d -> 2173000c3


Allow DoFns to be Reused

Modify the documentation of ParDo to allow DoFns to be reused for
multiple bundles in the general case. This allows DoFns which implement
start and finish bundle to be reused.

Update ParDoTest.TestDoFn to succeed if reused.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5e60c70a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5e60c70a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5e60c70a

Branch: refs/heads/master
Commit: 5e60c70a93cdc0855f51681c1fa1724380f7661a
Parents: 0e62c29
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jun 6 11:52:58 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jun 6 11:52:58 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/ParDo.java   | 31 ++++++++++----------
 .../apache/beam/sdk/transforms/ParDoTest.java   |  3 +-
 2 files changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e60c70a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 834f60d..511f0d8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -87,11 +88,11 @@ import javax.annotation.Nullable;
  * For each bundle of input elements processing proceeds as follows:
  *
  * <ol>
- *   <li>A fresh instance of the argument {@link DoFn} is created on a worker. This may
- *     be through deserialization or other means. If the {@link DoFn} subclass
- *     does not override {@link DoFn#startBundle startBundle} or
- *     {@link DoFn#finishBundle finishBundle} then this may be optimized since
- *     it cannot observe the start and end of a bundle.</li>
+ *   <li>If required, a fresh instance of the argument {@link DoFn} is created
+ *     on a worker. This may be through deserialization or other means. A
+ *     {@link PipelineRunner} may reuse {@link DoFn} instances for multiple bundles.
+ *     A {@link DoFn} that has terminated abnormally (by throwing an {@link Exception}
+ *     will never be reused.</li>
  *   <li>The {@link DoFn DoFn's} {@link DoFn#startBundle} method is called to
  *     initialize it. If this method is not overridden, the call may be optimized
  *     away.</li>
@@ -99,7 +100,8 @@ import javax.annotation.Nullable;
  *     is called on each of the input elements in the bundle.</li>
  *   <li>The {@link DoFn DoFn's} {@link DoFn#finishBundle} method is called
  *     to complete its work. After {@link DoFn#finishBundle} is called, the
- *     framework will never again invoke any of these three processing methods.
+ *     framework will not again invoke {@link DoFn#processElement} or {@link DoFn#finishBundle}
+ *     until a new call to {@link DoFn#startBundle} has occurred.
  *     If this method is not overridden, this call may be optimized away.</li>
  * </ol>
  *
@@ -309,17 +311,12 @@ import javax.annotation.Nullable;
  * <p>A {@link DoFn} passed to a {@link ParDo} transform must be
  * {@link Serializable}. This allows the {@link DoFn} instance
  * created in this "main program" to be sent (in serialized form) to
- * remote worker machines and reconstituted for each bundles of elements
+ * remote worker machines and reconstituted for bundles of elements
  * of the input {@link PCollection} being processed. A {@link DoFn}
  * can have instance variable state, and non-transient instance
  * variable state will be serialized in the main program and then
- * deserialized on remote worker machines for each bundle of elements
- * to process.
- *
- * <p>To aid in ensuring that {@link DoFn DoFns} are properly
- * {@link Serializable}, even local execution using the
- * {@link DirectPipelineRunner} will serialize and then deserialize
- * {@link DoFn DoFns} before executing them on a bundle.
+ * deserialized on remote worker machines for some number of bundles
+ * of elements to process.
  *
  * <p>{@link DoFn DoFns} expressed as anonymous inner classes can be
  * convenient, but due to a quirk in Java's rules for serializability,
@@ -360,9 +357,11 @@ import javax.annotation.Nullable;
  * class), initialized by the {@link DoFn}'s constructor (which is
  * implicit for an anonymous inner class). This state will be
  * automatically serialized and then deserialized in the {@code DoFn}
- * instance created for each bundle. This method is good for state
+ * instances created for bundles. This method is good for state
  * known when the original {@code DoFn} is created in the main
- * program, if it's not overly large.
+ * program, if it's not overly large. This is not suitable for any
+ * state which must only be used for a single bundle, as {@link DoFn DoFn's}
+ * may be used to process multiple bundles.
  *
  * <li>Compute the state as a singleton {@link PCollection} and pass it
  * in as a side input to the {@link DoFn}. This is good if the state

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e60c70a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index c0c8051..5f0f8ec 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -121,7 +121,8 @@ public class ParDoTest implements Serializable {
 
     @Override
     public void startBundle(Context c) {
-      assertEquals(State.UNSTARTED, state);
+      // The Fn can be reused, but only if FinishBundle has been called.
+      assertThat(state, anyOf(equalTo(State.UNSTARTED), equalTo(State.FINISHED)));
       state = State.STARTED;
       outputToAll(c, "started");
     }


[2/2] incubator-beam git commit: This closes #419

Posted by ke...@apache.org.
This closes #419


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2173000c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2173000c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2173000c

Branch: refs/heads/master
Commit: 2173000c3cc900e2430b6342856a078de424f593
Parents: fcf5bc7 5e60c70
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 7 13:35:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 7 13:35:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/ParDo.java   | 31 ++++++++++----------
 .../apache/beam/sdk/transforms/ParDoTest.java   |  3 +-
 2 files changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------