You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/25 22:09:03 UTC

[1/2] incubator-beam git commit: Clear PubsubUnboundedSource checkpoint state which is not needed for restore. Allows same checkpoint object to be passed to new reader without a serialize/deserialize step.

Repository: incubator-beam
Updated Branches:
  refs/heads/master bde2a856f -> d4c052c32


Clear PubsubUnboundedSource checkpoint state which is not needed for restore. Allows same checkpoint object to be passed to new reader without a serialize/deserialize step.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/521bfff4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/521bfff4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/521bfff4

Branch: refs/heads/master
Commit: 521bfff45273eeccd858e838f6748dc96607275f
Parents: 9f97ea0
Author: Mark Shields <ma...@google.com>
Authored: Wed May 25 14:21:31 2016 -0700
Committer: Mark Shields <ma...@google.com>
Committed: Wed May 25 14:22:17 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/PubsubUnboundedSource.java     | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/521bfff4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index 0492c76..e7634ec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -233,7 +233,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   static class PubsubCheckpoint<T> implements UnboundedSource.CheckpointMark {
     /**
      * If the checkpoint is for persisting: the reader who's snapshotted state we are persisting.
-     * If the checkpoint is for restoring: initially {@literal null}, then explicitly set.
+     * If the checkpoint is for restoring: {@literal null}.
      * Not persisted in durable checkpoint.
      * CAUTION: Between a checkpoint being taken and {@link #finalizeCheckpoint()} being called
      * the 'true' active reader may have changed.
@@ -248,7 +248,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
      * Not persisted in durable checkpoint.
      */
     @Nullable
-    private final List<String> safeToAckIds;
+    private List<String> safeToAckIds;
 
     /**
      * If the checkpoint is for persisting: The ACK ids of messages which have been received
@@ -299,6 +299,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       } finally {
         checkState(reader.numInFlightCheckpoints.decrementAndGet() >= 0,
                    "Miscounted in-flight checkpoints");
+        reader = null;
+        safeToAckIds = null;
       }
     }
 


[2/2] incubator-beam git commit: This closes #388

Posted by ke...@apache.org.
This closes #388


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d4c052c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d4c052c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d4c052c3

Branch: refs/heads/master
Commit: d4c052c32be8d5635b32fcdc214724a399b09c55
Parents: bde2a85 521bfff
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 25 14:52:07 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 25 14:52:07 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/PubsubUnboundedSource.java     | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------