You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/29 19:37:48 UTC

[2/2] incubator-beam git commit: Update Checkpoint Documentation

Update Checkpoint Documentation

Checkpoint finalization is best effort. A checkpoint that is committed
to durable state is permitted to be reused to start a reader, regardless
of if it is finalized.

Note that checkpoints which have an affect on their source (via
finalize) should generally require Deduping, due to the potential for
arbitrary checkpoint finalization failures.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2a2f639d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2a2f639d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2a2f639d

Branch: refs/heads/master
Commit: 2a2f639d5f5aae6e0ed03448531cb15aa2eca706
Parents: 97bcfec
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jun 20 13:48:17 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 29 12:37:30 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/UnboundedSource.java | 26 +++++++++++---------
 1 file changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a2f639d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
index ea3004e..dded8e2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
@@ -91,6 +91,10 @@ public abstract class UnboundedSource<
    * <p>This is needed if the underlying data source can return the same record multiple times,
    * such a queuing system with a pull-ack model.  Sources where the records read are uniquely
    * identified by the persisted state in the CheckpointMark do not need this.
+   *
+   * <p>Generally, if {@link CheckpointMark#finalizeCheckpoint()} is overridden, this method should
+   * return true. Checkpoint finalization is best-effort, and readers can be resumed from a
+   * checkpoint that has not been finalized.
    */
   public boolean requiresDeduping() {
     return false;
@@ -106,7 +110,7 @@ public abstract class UnboundedSource<
     /**
      * Called by the system to signal that this checkpoint mark has been committed along with
      * all the records which have been read from the {@link UnboundedReader} since the
-     * previous finalized checkpoint was taken.
+     * previous checkpoint was taken.
      *
      * <p>For example, this method could send acknowledgements to an external data source
      * such as Pubsub.
@@ -120,15 +124,9 @@ public abstract class UnboundedSource<
      * {@link UnboundedReader} has not yet be finalized.
      * <li>In the absence of failures, all checkpoints will be finalized and they will be
      * finalized in the same order they were taken from the {@link UnboundedReader}.
-     * <li>It is possible for a checkpoint to be taken but this method never called if
-     * the checkpoint could not be committed for any reason.
-     * <li>If this call throws an exception then the entire checkpoint will be abandoned and the
-     * reader restarted from an earlier, successfully-finalized checkpoint.
-     * <li>If a checkpoint fails for any reason then no later checkpoint will be allowed to be
-     * finalized without the reader first being restarted.
-     * <li>If an {@link UnboundedReader} is restarted from an earlier checkpoint, the checkpoint
-     * instance will be deserialized from the serialized form of the earlier checkpoint using
-     * {@link UnboundedSource#getCheckpointMarkCoder()}.
+     * <li>It is possible for a checkpoint to be taken but this method never called. This method
+     * will never be called if the checkpoint could not be committed, and other failures may cause
+     * this method to never be called.
      * <li>It is not safe to assume the {@link UnboundedReader} from which this checkpoint was
      * created still exists at the time this method is called.
      * </ul>
@@ -230,8 +228,12 @@ public abstract class UnboundedSource<
      * <p>All elements read up until this method is called will be processed together as a bundle.
      * (An element is considered 'read' if it could be returned by a call to {@link #getCurrent}.)
      * Once the result of processing those elements and the returned checkpoint have been durably
-     * committed, {@link CheckpointMark#finalizeCheckpoint} will (eventually) be called on the
-     * returned {@link CheckpointMark} object.
+     * committed, {@link CheckpointMark#finalizeCheckpoint} will be called at most once at some
+     * later point on the returned {@link CheckpointMark} object. Checkpoint finalization is
+     * best-effort, and checkpoints may not be finalized. If duplicate elements may be produced if
+     * checkpoints are not finalized in a timely manner, {@link UnboundedSource#requiresDeduping()}
+     * should be overridden to return true, and {@link UnboundedReader#getCurrentRecordId()} should
+     * be overriden to return unique record IDs.
      *
      * <p>The returned object should not be modified.
      *