You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/04 22:39:47 UTC

[2/2] beam git commit: Make Pubsub ackBatch safer

Make Pubsub ackBatch safer

Checkpoints can be finalized at any point, including while the reader is
being closed or after it has been.

Calls to close with an active checkpoint mark the PubsubUnboundedSource
as inactive. If there are no active checkpoints, the Client is closed.
Otherwise the client is closed whenever the last CheckpointMark for the
reader is finalized.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c6c32bb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c6c32bb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c6c32bb

Branch: refs/heads/master
Commit: 9c6c32bb11ef80c3cbb744e23e4b8c312592a3f7
Parents: 65ffd6c
Author: Thomas Groh <tg...@google.com>
Authored: Wed Mar 29 19:26:06 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 4 15:39:33 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/PubsubUnboundedSource.java      | 74 ++++++++++++++------
 .../beam/sdk/io/PubsubUnboundedSourceTest.java  | 13 ++++
 2 files changed, 65 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9c6c32bb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index 1184968..0e6bec8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -42,7 +42,9 @@ import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -312,8 +314,10 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
           reader.ackBatch(batchSafeToAckIds);
         }
       } finally {
-        checkState(reader.numInFlightCheckpoints.decrementAndGet() >= 0,
+        int remainingInFlight = reader.numInFlightCheckpoints.decrementAndGet();
+        checkState(remainingInFlight >= 0,
                    "Miscounted in-flight checkpoints");
+        reader.maybeCloseClient();
         reader = null;
         safeToAckIds = null;
       }
@@ -398,10 +402,15 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     private final SimpleFunction<PubsubIO.PubsubMessage, T> parseFn;
 
     /**
-     * Client on which to talk to Pubsub. Null if closed.
+     * Client on which to talk to Pubsub. Contains a null value if the client has been closed.
      */
-    @Nullable
-    private PubsubClient pubsubClient;
+    private AtomicReference<PubsubClient> pubsubClient;
+
+    /**
+     * The closed state of this {@link PubsubReader}. If true, the reader has not yet been closed,
+     * and it will have a non-null value within {@link #pubsubClient}.
+     */
+    private AtomicBoolean active = new AtomicBoolean(true);
 
     /**
      * Ack timeout, in ms, as set on subscription when we first start reading. Not
@@ -590,8 +599,9 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       this.subscription = subscription;
       this.parseFn = parseFn;
       pubsubClient =
-          outer.outer.pubsubFactory.newClient(outer.outer.timestampLabel, outer.outer.idLabel,
-                                              options);
+          new AtomicReference<>(
+              outer.outer.pubsubFactory.newClient(
+                  outer.outer.timestampLabel, outer.outer.idLabel, options));
       ackTimeoutMs = -1;
       safeToAckIds = new HashSet<>();
       notYetRead = new ArrayDeque<>();
@@ -626,17 +636,19 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
 
     @VisibleForTesting
     PubsubClient getPubsubClient() {
-      return pubsubClient;
+      return pubsubClient.get();
     }
 
     /**
-     * BLOCKING
-     * ACK {@code ackIds} back to Pubsub.
-     * CAUTION: May be invoked from a separate checkpointing thread.
-     * CAUTION: Retains {@code ackIds}.
+     * Acks the provided {@code ackIds} back to Pubsub, blocking until all of the messages are
+     * ACKed.
+     *
+     * <p>CAUTION: May be invoked from a separate thread.
+     *
+     * <p>CAUTION: Retains {@code ackIds}.
      */
     void ackBatch(List<String> ackIds) throws IOException {
-      pubsubClient.acknowledge(subscription, ackIds);
+      pubsubClient.get().acknowledge(subscription, ackIds);
       ackedIds.add(ackIds);
     }
 
@@ -646,7 +658,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
      * with the given {@code ockIds}. Does not retain {@code ackIds}.
      */
     public void nackBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException {
-      pubsubClient.modifyAckDeadline(subscription, ackIds, 0);
+      pubsubClient.get().modifyAckDeadline(subscription, ackIds, 0);
       numNacked.add(nowMsSinceEpoch, ackIds.size());
     }
 
@@ -657,7 +669,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
      */
     private void extendBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException {
       int extensionSec = (ackTimeoutMs * ACK_EXTENSION_PCT) / (100 * 1000);
-      pubsubClient.modifyAckDeadline(subscription, ackIds, extensionSec);
+      pubsubClient.get().modifyAckDeadline(subscription, ackIds, extensionSec);
       numExtendedDeadlines.add(nowMsSinceEpoch, ackIds.size());
     }
 
@@ -792,9 +804,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       // Pull the next batch.
       // BLOCKs until received.
       Collection<PubsubClient.IncomingMessage> receivedMessages =
-          pubsubClient.pull(requestTimeMsSinceEpoch,
-                            subscription,
-                            PULL_BATCH_SIZE, true);
+          pubsubClient.get().pull(requestTimeMsSinceEpoch, subscription, PULL_BATCH_SIZE, true);
       if (receivedMessages.isEmpty()) {
         // Nothing available yet. Try again later.
         return;
@@ -899,7 +909,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     @Override
     public boolean start() throws IOException {
       // Determine the ack timeout.
-      ackTimeoutMs = pubsubClient.ackDeadlineSeconds(subscription) * 1000;
+      ackTimeoutMs = pubsubClient.get().ackDeadlineSeconds(subscription) * 1000;
       return advance();
     }
 
@@ -991,11 +1001,31 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       return current.recordId.getBytes(Charsets.UTF_8);
     }
 
+    /**
+     * {@inheritDoc}.
+     *
+     * <p>Marks this {@link PubsubReader} as no longer active. The {@link PubsubClient}
+     * continue to exist and be active beyond the life of this call if there are any in-flight
+     * checkpoints. When no in-flight checkpoints remain, the reader will be closed.
+     */
     @Override
     public void close() throws IOException {
-      if (pubsubClient != null) {
-        pubsubClient.close();
-        pubsubClient = null;
+      active.set(false);
+      maybeCloseClient();
+    }
+
+    /**
+     * Close this reader's underlying {@link PubsubClient} if the reader has been closed and there
+     * are no outstanding checkpoints.
+     */
+    private void maybeCloseClient() throws IOException {
+      if (!active.get() && numInFlightCheckpoints.get() == 0) {
+        // The reader has been closed and it has no more outstanding checkpoints. The client
+        // must be closed so it doesn't leak
+        PubsubClient client = pubsubClient.getAndSet(null);
+        if (client != null) {
+          client.close();
+        }
       }
     }
 
@@ -1006,7 +1036,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
 
     @Override
     public Instant getWatermark() {
-      if (pubsubClient.isEOF() && notYetRead.isEmpty()) {
+      if (pubsubClient.get().isEOF() && notYetRead.isEmpty()) {
         // For testing only: Advance the watermark to the end of time to signal
         // the test is complete.
         return BoundedWindow.TIMESTAMP_MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/beam/blob/9c6c32bb/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
index 478ecd1..d9df2ca 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
@@ -395,4 +395,17 @@ public class PubsubUnboundedSourceTest {
     assertThat(readerFromOriginal.subscription, equalTo(createdSubscription));
     assertThat(readerFromDeser.subscription, equalTo(createdSubscription));
   }
+
+  /**
+   * Tests that checkpoints finalized after the reader is closed succeed.
+   */
+  @Test
+  public void closeWithActiveCheckpoints() throws Exception {
+    setupOneMessage();
+    PubsubReader<String> reader = primSource.createReader(p.getOptions(), null);
+    reader.start();
+    PubsubCheckpoint<String> checkpoint = reader.getCheckpointMark();
+    reader.close();
+    checkpoint.finalizeCheckpoint();
+  }
 }