You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/08 19:45:39 UTC

[1/2] incubator-beam git commit: Closes #340

Repository: incubator-beam
Updated Branches:
  refs/heads/master 7fb21f223 -> 61d8cf2c4


Closes #340


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/61d8cf2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/61d8cf2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/61d8cf2c

Branch: refs/heads/master
Commit: 61d8cf2c48c2c5bd6cbdd8c2755cac5be4b6777c
Parents: 7fb21f2 269da95
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jun 8 12:45:31 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 8 12:45:31 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/UnboundedSource.java | 47 +++++++++++++++-----
 1 file changed, 35 insertions(+), 12 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Improve UnboundedSource javadoc

Posted by dh...@apache.org.
Improve UnboundedSource javadoc


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/269da952
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/269da952
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/269da952

Branch: refs/heads/master
Commit: 269da95272e8d6d35d3a7e6081b6059ca1bc2e37
Parents: 7fb21f2
Author: Mark Shields <ma...@google.com>
Authored: Mon May 16 12:56:53 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 8 12:45:31 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/UnboundedSource.java | 47 +++++++++++++++-----
 1 file changed, 35 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269da952/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
index 82c8db7..2c4a325 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
@@ -104,14 +104,34 @@ public abstract class UnboundedSource<
    */
   public interface CheckpointMark {
     /**
-     * Perform any finalization that needs to happen after a bundle of data read from
-     * the source has been processed and committed.
+     * Called by the system to signal that this checkpoint mark has been committed along with
+     * all the records which have been read from the {@link UnboundedReader} since the
+     * previous finalized checkpoint was taken.
      *
-     * <p>For example, this could be sending acknowledgement requests to an external
-     * data source such as Pub/Sub.
+     * <p>For example, this method could send acknowledgements to an external data source
+     * such as Pubsub.
      *
-     * <p>This may be called from any thread, potentially at the same time as calls to the
-     * {@code UnboundedReader} that created it.
+     * <p>Note that:
+     * <ul>
+     * <li>This finalize method may be called from any thread, concurrently with calls to
+     * the {@link UnboundedReader} it was created from.
+     * <li>Checkpoints will not necessarily be finalized as soon as they are created.
+     * A checkpoint may be taken while a previous checkpoint from the same
+     * {@link UnboundedReader} has not yet be finalized.
+     * <li>In the absence of failures, all checkpoints will be finalized and they will be
+     * finalized in the same order they were taken from the {@link UnboundedReader}.
+     * <li>It is possible for a checkpoint to be taken but this method never called if
+     * the checkpoint could not be committed for any reason.
+     * <li>If this call throws an exception then the entire checkpoint will be abandoned and the
+     * reader restarted from an earlier, successfully-finalized checkpoint.
+     * <li>If a checkpoint fails for any reason then no later checkpoint will be allowed to be
+     * finalized without the reader first being restarted.
+     * <li>If an {@link UnboundedReader} is restarted from an earlier checkpoint, the checkpoint
+     * instance will be deserialized from the serialized form of the earlier checkpoint using
+     * {@link UnboundedSource#getCheckpointMarkCoder()}.
+     * <li>It is not safe to assume the {@link UnboundedReader} from which this checkpoint was
+     * created still exists at the time this method is called.
+     * </ul>
      */
     void finalizeCheckpoint() throws IOException;
   }
@@ -126,9 +146,11 @@ public abstract class UnboundedSource<
     private static final byte[] EMPTY = new byte[0];
 
     /**
-     * Initializes the reader and advances the reader to the first record.
+     * Initializes the reader and advances the reader to the first record. If the reader has been
+     * restored from a checkpoint then it should advance to the next unread record at the point
+     * the checkpoint was taken.
      *
-     * <p>This method should be called exactly once. The invocation should occur prior to calling
+     * <p>This method will be called exactly once. The invocation will occur prior to calling
      * {@link #advance} or {@link #getCurrent}. This method may perform expensive operations that
      * are needed to initialize the reader.
      *
@@ -205,10 +227,11 @@ public abstract class UnboundedSource<
     /**
      * Returns a {@link CheckpointMark} representing the progress of this {@code UnboundedReader}.
      *
-     * <p>The elements read up until this is called will be processed together as a bundle. Once
-     * the result of this processing has been durably committed,
-     * {@link CheckpointMark#finalizeCheckpoint} will be called on the {@link CheckpointMark}
-     * object.
+     * <p>All elements read up until this method is called will be processed together as a bundle.
+     * (An element is considered 'read' if it could be returned by a call to {@link #getCurrent}.)
+     * Once the result of processing those elements and the returned checkpoint have been durably
+     * committed, {@link CheckpointMark#finalizeCheckpoint} will (eventually) be called on the
+     * returned {@link CheckpointMark} object.
      *
      * <p>The returned object should not be modified.
      *